From 5199299f32d77a7bd7e457d261c255673e825a1f Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Sun, 23 Nov 2025 11:27:12 +0000 Subject: [PATCH 01/11] Setting up GitHub Classroom Feedback From 7fad092890883580dc3b87d878073124033996a6 Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Sun, 23 Nov 2025 11:27:14 +0000 Subject: [PATCH 02/11] add deadline --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 97f8c85..98999e5 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/QODoQuhO) # Распределенная обработка текстовых данных с использованием брокера сообщений ## Цель задания: From 6f3b5cac565a30ce74d3f79f23ab40155ffaeecd Mon Sep 17 00:00:00 2001 From: railolog Date: Sun, 23 Nov 2025 18:52:35 +0300 Subject: [PATCH 03/11] init rabbit and producer --- .gitignore | 3 + .idea/.gitignore | 8 - .idea/compiler.xml | 6 - .idea/gradle.xml | 15 -- .idea/jarRepositories.xml | 20 -- .idea/kotlinc.xml | 6 - .idea/misc.xml | 10 - .idea/modules.xml | 9 - .idea/modules/lab_3.main.iml | 8 - .idea/modules/lab_3.test.iml | 8 - .idea/vcs.xml | 6 - build.gradle.kts | 29 -- docker-compose.yml | 7 + gradlew | 249 ------------------ gradlew.bat | 92 ------- producer/pom.xml | 70 +++++ producer/src/main/java/ru/ifmo/App.java | 11 + producer/src/main/java/ru/ifmo/AppConfig.java | 12 + .../main/java/ru/ifmo/HelloController.java | 34 +++ .../ru/ifmo/configuration/RabbitConfig.java | 17 ++ .../src/main/resources/application.properties | 6 + producer/src/main/resources/logback.xml | 7 + .../src/main/resources/templates/hello.html | 12 + producer/src/test/java/ru/ifmo/.nop | 0 producer/src/test/resources/.gitkeep | 0 src/main/java/Main.java | 5 - 26 files changed, 179 insertions(+), 471 deletions(-) create mode 100644 .gitignore delete mode 100644 .idea/.gitignore delete mode 100644 .idea/compiler.xml delete mode 100644 .idea/gradle.xml delete mode 100644 .idea/jarRepositories.xml delete mode 100644 .idea/kotlinc.xml delete mode 100644 .idea/misc.xml delete mode 100644 .idea/modules.xml delete mode 100644 .idea/modules/lab_3.main.iml delete mode 100644 .idea/modules/lab_3.test.iml delete mode 100644 .idea/vcs.xml delete mode 100644 build.gradle.kts create mode 100644 docker-compose.yml delete mode 100755 gradlew delete mode 100644 gradlew.bat create mode 100644 producer/pom.xml create mode 100644 producer/src/main/java/ru/ifmo/App.java create mode 100644 producer/src/main/java/ru/ifmo/AppConfig.java create mode 100644 producer/src/main/java/ru/ifmo/HelloController.java create mode 100644 producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java create mode 100644 producer/src/main/resources/application.properties create mode 100644 producer/src/main/resources/logback.xml create mode 100644 producer/src/main/resources/templates/hello.html create mode 100644 producer/src/test/java/ru/ifmo/.nop create mode 100644 producer/src/test/resources/.gitkeep delete mode 100644 src/main/java/Main.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ffd58ad --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/producer/target/ +/.idea/yatool.xml +.idea/ diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 13566b8..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/.idea/compiler.xml b/.idea/compiler.xml deleted file mode 100644 index 61a9130..0000000 --- a/.idea/compiler.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/gradle.xml b/.idea/gradle.xml deleted file mode 100644 index f9163b4..0000000 --- a/.idea/gradle.xml +++ /dev/null @@ -1,15 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/jarRepositories.xml b/.idea/jarRepositories.xml deleted file mode 100644 index fdc392f..0000000 --- a/.idea/jarRepositories.xml +++ /dev/null @@ -1,20 +0,0 @@ - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/kotlinc.xml b/.idea/kotlinc.xml deleted file mode 100644 index e805548..0000000 --- a/.idea/kotlinc.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml deleted file mode 100644 index 2dae0dd..0000000 --- a/.idea/misc.xml +++ /dev/null @@ -1,10 +0,0 @@ - - - - - - - - - - \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index 5c567f2..0000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,9 +0,0 @@ - - - - - - - - - \ No newline at end of file diff --git a/.idea/modules/lab_3.main.iml b/.idea/modules/lab_3.main.iml deleted file mode 100644 index d6ff951..0000000 --- a/.idea/modules/lab_3.main.iml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/modules/lab_3.test.iml b/.idea/modules/lab_3.test.iml deleted file mode 100644 index 8210e2d..0000000 --- a/.idea/modules/lab_3.test.iml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 35eb1dd..0000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts deleted file mode 100644 index 8e91d96..0000000 --- a/build.gradle.kts +++ /dev/null @@ -1,29 +0,0 @@ -plugins { - kotlin("jvm") version "1.9.20" - application -} - -group = "org.itmo" -version = "1.0-SNAPSHOT" - -repositories { - mavenCentral() -} - -dependencies { - implementation("javax.jms:jms-api:2.0.1") - implementation("org.apache.activemq:activemq-broker:6.1.1") - testImplementation(kotlin("test")) -} - -tasks.test { - useJUnitPlatform() -} - -kotlin { - jvmToolchain(8) -} - -application { - mainClass.set("MainKt") -} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..de2ba73 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,7 @@ +services: + rabbitmq: + image: rabbitmq:3.10.7-management + hostname: rabbitmq + ports: + - "15672:15672" + - "5672:5672" diff --git a/gradlew b/gradlew deleted file mode 100755 index 1aa94a4..0000000 --- a/gradlew +++ /dev/null @@ -1,249 +0,0 @@ -#!/bin/sh - -# -# Copyright © 2015-2021 the original authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -############################################################################## -# -# Gradle start up script for POSIX generated by Gradle. -# -# Important for running: -# -# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is -# noncompliant, but you have some other compliant shell such as ksh or -# bash, then to run this script, type that shell name before the whole -# command line, like: -# -# ksh Gradle -# -# Busybox and similar reduced shells will NOT work, because this script -# requires all of these POSIX shell features: -# * functions; -# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», -# «${var#prefix}», «${var%suffix}», and «$( cmd )»; -# * compound commands having a testable exit status, especially «case»; -# * various built-in commands including «command», «set», and «ulimit». -# -# Important for patching: -# -# (2) This script targets any POSIX shell, so it avoids extensions provided -# by Bash, Ksh, etc; in particular arrays are avoided. -# -# The "traditional" practice of packing multiple parameters into a -# space-separated string is a well documented source of bugs and security -# problems, so this is (mostly) avoided, by progressively accumulating -# options in "$@", and eventually passing that to Java. -# -# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, -# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; -# see the in-line comments for details. -# -# There are tweaks for specific operating systems such as AIX, CygWin, -# Darwin, MinGW, and NonStop. -# -# (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt -# within the Gradle project. -# -# You can find Gradle at https://github.com/gradle/gradle/. -# -############################################################################## - -# Attempt to set APP_HOME - -# Resolve links: $0 may be a link -app_path=$0 - -# Need this for daisy-chained symlinks. -while - APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path - [ -h "$app_path" ] -do - ls=$( ls -ld "$app_path" ) - link=${ls#*' -> '} - case $link in #( - /*) app_path=$link ;; #( - *) app_path=$APP_HOME$link ;; - esac -done - -# This is normally unused -# shellcheck disable=SC2034 -APP_BASE_NAME=${0##*/} -# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) -APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit - -# Use the maximum available, or set MAX_FD != -1 to use that value. -MAX_FD=maximum - -warn () { - echo "$*" -} >&2 - -die () { - echo - echo "$*" - echo - exit 1 -} >&2 - -# OS specific support (must be 'true' or 'false'). -cygwin=false -msys=false -darwin=false -nonstop=false -case "$( uname )" in #( - CYGWIN* ) cygwin=true ;; #( - Darwin* ) darwin=true ;; #( - MSYS* | MINGW* ) msys=true ;; #( - NONSTOP* ) nonstop=true ;; -esac - -CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar - - -# Determine the Java command to use to start the JVM. -if [ -n "$JAVA_HOME" ] ; then - if [ -x "$JAVA_HOME/jre/sh/java" ] ; then - # IBM's JDK on AIX uses strange locations for the executables - JAVACMD=$JAVA_HOME/jre/sh/java - else - JAVACMD=$JAVA_HOME/bin/java - fi - if [ ! -x "$JAVACMD" ] ; then - die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME - -Please set the JAVA_HOME variable in your environment to match the -location of your Java installation." - fi -else - JAVACMD=java - if ! command -v java >/dev/null 2>&1 - then - die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. - -Please set the JAVA_HOME variable in your environment to match the -location of your Java installation." - fi -fi - -# Increase the maximum file descriptors if we can. -if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then - case $MAX_FD in #( - max*) - # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC2039,SC3045 - MAX_FD=$( ulimit -H -n ) || - warn "Could not query maximum file descriptor limit" - esac - case $MAX_FD in #( - '' | soft) :;; #( - *) - # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC2039,SC3045 - ulimit -n "$MAX_FD" || - warn "Could not set maximum file descriptor limit to $MAX_FD" - esac -fi - -# Collect all arguments for the java command, stacking in reverse order: -# * args from the command line -# * the main class name -# * -classpath -# * -D...appname settings -# * --module-path (only if needed) -# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. - -# For Cygwin or MSYS, switch paths to Windows format before running java -if "$cygwin" || "$msys" ; then - APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) - CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) - - JAVACMD=$( cygpath --unix "$JAVACMD" ) - - # Now convert the arguments - kludge to limit ourselves to /bin/sh - for arg do - if - case $arg in #( - -*) false ;; # don't mess with options #( - /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath - [ -e "$t" ] ;; #( - *) false ;; - esac - then - arg=$( cygpath --path --ignore --mixed "$arg" ) - fi - # Roll the args list around exactly as many times as the number of - # args, so each arg winds up back in the position where it started, but - # possibly modified. - # - # NB: a `for` loop captures its iteration list before it begins, so - # changing the positional parameters here affects neither the number of - # iterations, nor the values presented in `arg`. - shift # remove old arg - set -- "$@" "$arg" # push replacement arg - done -fi - - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' - -# Collect all arguments for the java command: -# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, -# and any embedded shellness will be escaped. -# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be -# treated as '${Hostname}' itself on the command line. - -set -- \ - "-Dorg.gradle.appname=$APP_BASE_NAME" \ - -classpath "$CLASSPATH" \ - org.gradle.wrapper.GradleWrapperMain \ - "$@" - -# Stop when "xargs" is not available. -if ! command -v xargs >/dev/null 2>&1 -then - die "xargs is not available" -fi - -# Use "xargs" to parse quoted args. -# -# With -n1 it outputs one arg per line, with the quotes and backslashes removed. -# -# In Bash we could simply go: -# -# readarray ARGS < <( xargs -n1 <<<"$var" ) && -# set -- "${ARGS[@]}" "$@" -# -# but POSIX shell has neither arrays nor command substitution, so instead we -# post-process each arg (as a line of input to sed) to backslash-escape any -# character that might be a shell metacharacter, then use eval to reverse -# that process (while maintaining the separation between arguments), and wrap -# the whole thing up as a single "set" statement. -# -# This will of course break if any of these variables contains a newline or -# an unmatched quote. -# - -eval "set -- $( - printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | - xargs -n1 | - sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | - tr '\n' ' ' - )" '"$@"' - -exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat deleted file mode 100644 index 93e3f59..0000000 --- a/gradlew.bat +++ /dev/null @@ -1,92 +0,0 @@ -@rem -@rem Copyright 2015 the original author or authors. -@rem -@rem Licensed under the Apache License, Version 2.0 (the "License"); -@rem you may not use this file except in compliance with the License. -@rem You may obtain a copy of the License at -@rem -@rem https://www.apache.org/licenses/LICENSE-2.0 -@rem -@rem Unless required by applicable law or agreed to in writing, software -@rem distributed under the License is distributed on an "AS IS" BASIS, -@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -@rem See the License for the specific language governing permissions and -@rem limitations under the License. -@rem - -@if "%DEBUG%"=="" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -set DIRNAME=%~dp0 -if "%DIRNAME%"=="" set DIRNAME=. -@rem This is normally unused -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Resolve any "." and ".." in APP_HOME to make it shorter. -for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if %ERRORLEVEL% equ 0 goto execute - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto execute - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* - -:end -@rem End local scope for the variables with windows NT shell -if %ERRORLEVEL% equ 0 goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -set EXIT_CODE=%ERRORLEVEL% -if %EXIT_CODE% equ 0 set EXIT_CODE=1 -if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% -exit /b %EXIT_CODE% - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega diff --git a/producer/pom.xml b/producer/pom.xml new file mode 100644 index 0000000..abdd6d4 --- /dev/null +++ b/producer/pom.xml @@ -0,0 +1,70 @@ + + + 4.0.0 + + ru.ifmo + producer + 1.0 + jar + + Producer + + + org.springframework.boot + spring-boot-starter-parent + 3.4.12 + + + + UTF-8 + ru.ifmo.App + 21 + 1.18.28 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-amqp + + + org.projectlombok + lombok + ${lombok.version} + provided + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + --add-opens java.base/java.lang=ALL-UNNAMED + + + + org.springframework.boot + spring-boot-maven-plugin + + + --add-opens java.base/java.lang=ALL-UNNAMED + --add-opens java.base/java.util=ALL-UNNAMED + + + + + + + + + + + + + + diff --git a/producer/src/main/java/ru/ifmo/App.java b/producer/src/main/java/ru/ifmo/App.java new file mode 100644 index 0000000..942563c --- /dev/null +++ b/producer/src/main/java/ru/ifmo/App.java @@ -0,0 +1,11 @@ +package ru.ifmo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class App { + public static void main(String[] args) { + SpringApplication.run(App.class, args); + } +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/AppConfig.java b/producer/src/main/java/ru/ifmo/AppConfig.java new file mode 100644 index 0000000..1c1062d --- /dev/null +++ b/producer/src/main/java/ru/ifmo/AppConfig.java @@ -0,0 +1,12 @@ +package ru.ifmo; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.sql.DataSource; + +@Configuration +public class AppConfig { +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/HelloController.java b/producer/src/main/java/ru/ifmo/HelloController.java new file mode 100644 index 0000000..ee1ade2 --- /dev/null +++ b/producer/src/main/java/ru/ifmo/HelloController.java @@ -0,0 +1,34 @@ +package ru.ifmo; + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class HelloController { + + private final RabbitTemplate rabbitTemplate; + private final String exchangeName; + + public HelloController( + @Value("${rabbitmq.exchange.name}") String exchangeName, + RabbitTemplate rabbitTemplate + ) { + this.rabbitTemplate = rabbitTemplate; + this.exchangeName = exchangeName; + } + + @RequestMapping("/") + String hello() { + return "Hello World!"; + } + + @RequestMapping("/send") + public void send( + @RequestParam("message") String message + ) { + rabbitTemplate.convertAndSend(exchangeName, "", message); + } +} diff --git a/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java new file mode 100644 index 0000000..956647a --- /dev/null +++ b/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -0,0 +1,17 @@ +package ru.ifmo.configuration; + +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitConfig { + + @Bean + public FanoutExchange fanoutExchange( + @Value("${rabbitmq.exchange.name}") String exchangeName + ) { + return new FanoutExchange(exchangeName); + } +} diff --git a/producer/src/main/resources/application.properties b/producer/src/main/resources/application.properties new file mode 100644 index 0000000..9eb722b --- /dev/null +++ b/producer/src/main/resources/application.properties @@ -0,0 +1,6 @@ +spring.rabbitmq.host=localhost +spring.rabbitmq.port=5672 +spring.rabbitmq.username=guest +spring.rabbitmq.password=guest + +rabbitmq.exchange.name=textTopic diff --git a/producer/src/main/resources/logback.xml b/producer/src/main/resources/logback.xml new file mode 100644 index 0000000..c53c4a5 --- /dev/null +++ b/producer/src/main/resources/logback.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/producer/src/main/resources/templates/hello.html b/producer/src/main/resources/templates/hello.html new file mode 100644 index 0000000..8cb9705 --- /dev/null +++ b/producer/src/main/resources/templates/hello.html @@ -0,0 +1,12 @@ + + + + + + + +

+ Hello! +

+ + \ No newline at end of file diff --git a/producer/src/test/java/ru/ifmo/.nop b/producer/src/test/java/ru/ifmo/.nop new file mode 100644 index 0000000..e69de29 diff --git a/producer/src/test/resources/.gitkeep b/producer/src/test/resources/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/src/main/java/Main.java b/src/main/java/Main.java deleted file mode 100644 index 477e8b9..0000000 --- a/src/main/java/Main.java +++ /dev/null @@ -1,5 +0,0 @@ -public class Main { - public static void main(String[] args) { - System.out.println("Lab 3"); - } -} From 625578ddcb9af3fe3f399789a924c5077d53d2fa Mon Sep 17 00:00:00 2001 From: railolog Date: Sun, 23 Nov 2025 19:14:59 +0300 Subject: [PATCH 04/11] add swagger --- producer/pom.xml | 7 ++++++- .../main/java/ru/ifmo/HelloController.java | 16 +++++++++++++++ .../ru/ifmo/configuration/SwaggerConfig.java | 20 +++++++++++++++++++ .../src/main/resources/application.properties | 9 +++++++++ 4 files changed, 51 insertions(+), 1 deletion(-) create mode 100644 producer/src/main/java/ru/ifmo/configuration/SwaggerConfig.java diff --git a/producer/pom.xml b/producer/pom.xml index abdd6d4..2c1aed0 100644 --- a/producer/pom.xml +++ b/producer/pom.xml @@ -19,7 +19,7 @@ UTF-8 ru.ifmo.App 21 - 1.18.28 + 1.18.30 @@ -37,6 +37,11 @@ ${lombok.version} provided + + org.springdoc + springdoc-openapi-starter-webmvc-ui + 2.2.0 + diff --git a/producer/src/main/java/ru/ifmo/HelloController.java b/producer/src/main/java/ru/ifmo/HelloController.java index ee1ade2..e53b0f0 100644 --- a/producer/src/main/java/ru/ifmo/HelloController.java +++ b/producer/src/main/java/ru/ifmo/HelloController.java @@ -1,5 +1,10 @@ package ru.ifmo; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.responses.ApiResponses; +import io.swagger.v3.oas.annotations.tags.Tag; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.RequestMapping; @@ -7,6 +12,7 @@ import org.springframework.web.bind.annotation.RestController; @RestController +@Tag(name = "Producer Controller", description = "API for message production and basic operations") public class HelloController { private final RabbitTemplate rabbitTemplate; @@ -21,12 +27,22 @@ public HelloController( } @RequestMapping("/") + @Operation(summary = "Get greeting message", description = "Returns a simple greeting message") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "Successfully retrieved greeting message") + }) String hello() { return "Hello World!"; } @RequestMapping("/send") + @Operation(summary = "Send message to RabbitMQ", description = "Sends a message to the configured RabbitMQ exchange") + @ApiResponses(value = { + @ApiResponse(responseCode = "200", description = "Message sent successfully"), + @ApiResponse(responseCode = "400", description = "Invalid message parameter") + }) public void send( + @Parameter(description = "Message to be sent to RabbitMQ queue", required = true) @RequestParam("message") String message ) { rabbitTemplate.convertAndSend(exchangeName, "", message); diff --git a/producer/src/main/java/ru/ifmo/configuration/SwaggerConfig.java b/producer/src/main/java/ru/ifmo/configuration/SwaggerConfig.java new file mode 100644 index 0000000..6dc8702 --- /dev/null +++ b/producer/src/main/java/ru/ifmo/configuration/SwaggerConfig.java @@ -0,0 +1,20 @@ +package ru.ifmo.configuration; + +import io.swagger.v3.oas.models.OpenAPI; +import io.swagger.v3.oas.models.info.Info; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class SwaggerConfig { + + @Bean + public OpenAPI customOpenAPI() { + return new OpenAPI() + .info( + new Info() + .title("Producer API") + .version("1.0") + ); + } +} \ No newline at end of file diff --git a/producer/src/main/resources/application.properties b/producer/src/main/resources/application.properties index 9eb722b..2f0d3bb 100644 --- a/producer/src/main/resources/application.properties +++ b/producer/src/main/resources/application.properties @@ -1,6 +1,15 @@ +server.port=8080 + spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest rabbitmq.exchange.name=textTopic + +# Swagger/OpenAPI Configuration +springdoc.api-docs.path=/api-docs +springdoc.swagger-ui.path=/swagger-ui.html +springdoc.swagger-ui.operationsSorter=method +springdoc.swagger-ui.tagsSorter=alpha +springdoc.swagger-ui.tryItOutEnabled=true From a402aab69671739ba7dcaab1aa1c0490abf15d4a Mon Sep 17 00:00:00 2001 From: railolog Date: Sun, 23 Nov 2025 21:09:16 +0300 Subject: [PATCH 05/11] init consumer --- .gitignore | 1 + worker/pom.xml | 74 +++++++++++++++++++ worker/src/main/java/ru/ifmo/App.java | 11 +++ worker/src/main/java/ru/ifmo/AppConfig.java | 7 ++ .../main/java/ru/ifmo/MessagesListener.java | 13 ++++ .../ru/ifmo/configuration/RabbitConfig.java | 35 +++++++++ .../src/main/resources/application.properties | 9 +++ 7 files changed, 150 insertions(+) create mode 100644 worker/pom.xml create mode 100644 worker/src/main/java/ru/ifmo/App.java create mode 100644 worker/src/main/java/ru/ifmo/AppConfig.java create mode 100644 worker/src/main/java/ru/ifmo/MessagesListener.java create mode 100644 worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java create mode 100644 worker/src/main/resources/application.properties diff --git a/.gitignore b/.gitignore index ffd58ad..0a28c18 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /producer/target/ /.idea/yatool.xml .idea/ +/worker/target/ diff --git a/worker/pom.xml b/worker/pom.xml new file mode 100644 index 0000000..a95c775 --- /dev/null +++ b/worker/pom.xml @@ -0,0 +1,74 @@ + + 4.0.0 + + ru.ifmo + worker + 1.0 + jar + + Worker + + + org.springframework.boot + spring-boot-starter-parent + 3.4.12 + + + + UTF-8 + ru.ifmo.App + 21 + 1.18.30 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-amqp + + + org.projectlombok + lombok + ${lombok.version} + provided + + + org.springdoc + springdoc-openapi-starter-webmvc-ui + 2.2.0 + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + --add-opens java.base/java.lang=ALL-UNNAMED + + + + org.springframework.boot + spring-boot-maven-plugin + + + --add-opens java.base/java.lang=ALL-UNNAMED + --add-opens java.base/java.util=ALL-UNNAMED + + + + + + + + + + + + + diff --git a/worker/src/main/java/ru/ifmo/App.java b/worker/src/main/java/ru/ifmo/App.java new file mode 100644 index 0000000..942563c --- /dev/null +++ b/worker/src/main/java/ru/ifmo/App.java @@ -0,0 +1,11 @@ +package ru.ifmo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class App { + public static void main(String[] args) { + SpringApplication.run(App.class, args); + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/AppConfig.java b/worker/src/main/java/ru/ifmo/AppConfig.java new file mode 100644 index 0000000..af5370e --- /dev/null +++ b/worker/src/main/java/ru/ifmo/AppConfig.java @@ -0,0 +1,7 @@ +package ru.ifmo; + +import org.springframework.context.annotation.Configuration; + +@Configuration +public class AppConfig { +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/MessagesListener.java b/worker/src/main/java/ru/ifmo/MessagesListener.java new file mode 100644 index 0000000..3eb9484 --- /dev/null +++ b/worker/src/main/java/ru/ifmo/MessagesListener.java @@ -0,0 +1,13 @@ +package ru.ifmo; + +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +@Component +public class MessagesListener { + + @RabbitListener(queues = "${rabbitmq.queue.name}") + public void receiveMessage(String message) { + System.out.println("Received message: " + message); + } +} diff --git a/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java new file mode 100644 index 0000000..fddf89f --- /dev/null +++ b/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -0,0 +1,35 @@ +package ru.ifmo.configuration; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitConfig { + + @Bean + public FanoutExchange fanoutExchange( + @Value("${rabbitmq.exchange.name}") String exchangeName + ) { + return new FanoutExchange(exchangeName); + } + + @Bean + public Queue queue( + @Value("${rabbitmq.queue.name}") String queueName + ) { + return new Queue(queueName); + } + + @Bean + public Binding binding( + Queue queue, + FanoutExchange fanoutExchange + ) { + return BindingBuilder.bind(queue).to(fanoutExchange); + } +} diff --git a/worker/src/main/resources/application.properties b/worker/src/main/resources/application.properties new file mode 100644 index 0000000..0a62dc2 --- /dev/null +++ b/worker/src/main/resources/application.properties @@ -0,0 +1,9 @@ +server.port=${PORT:8081} + +spring.rabbitmq.host=localhost +spring.rabbitmq.port=5672 +spring.rabbitmq.username=guest +spring.rabbitmq.password=guest + +rabbitmq.exchange.name=textTopic +rabbitmq.queue.name=workerQueue From 4b7cd8d792bdf4b4fe2e44898c1173740f8d6260 Mon Sep 17 00:00:00 2001 From: railolog Date: Sun, 23 Nov 2025 21:34:10 +0300 Subject: [PATCH 06/11] worker impl --- producer/src/main/java/ru/ifmo/AppConfig.java | 5 - worker/pom.xml | 8 ++ .../main/java/ru/ifmo/MessagesListener.java | 51 +++++++- .../ru/ifmo/configuration/RabbitConfig.java | 42 +++++- .../ru/ifmo/controller/TestController.java | 35 +++++ .../ru/ifmo/dto/TextProcessingResult.java | 23 ++++ .../src/main/java/ru/ifmo/dto/TextTask.java | 12 ++ .../ifmo/service/NameReplacementService.java | 84 ++++++++++++ .../ifmo/service/SentenceSortingService.java | 78 ++++++++++++ .../service/SentimentAnalysisService.java | 120 ++++++++++++++++++ .../ifmo/service/TextProcessingService.java | 63 +++++++++ .../java/ru/ifmo/service/TopWordsService.java | 46 +++++++ .../ru/ifmo/service/WordCountService.java | 26 ++++ .../src/main/resources/application.properties | 5 + .../resources/sentiment/negative-words.txt | 42 ++++++ .../resources/sentiment/positive-words.txt | 41 ++++++ 16 files changed, 671 insertions(+), 10 deletions(-) create mode 100644 worker/src/main/java/ru/ifmo/controller/TestController.java create mode 100644 worker/src/main/java/ru/ifmo/dto/TextProcessingResult.java create mode 100644 worker/src/main/java/ru/ifmo/dto/TextTask.java create mode 100644 worker/src/main/java/ru/ifmo/service/NameReplacementService.java create mode 100644 worker/src/main/java/ru/ifmo/service/SentenceSortingService.java create mode 100644 worker/src/main/java/ru/ifmo/service/SentimentAnalysisService.java create mode 100644 worker/src/main/java/ru/ifmo/service/TextProcessingService.java create mode 100644 worker/src/main/java/ru/ifmo/service/TopWordsService.java create mode 100644 worker/src/main/java/ru/ifmo/service/WordCountService.java create mode 100644 worker/src/main/resources/sentiment/negative-words.txt create mode 100644 worker/src/main/resources/sentiment/positive-words.txt diff --git a/producer/src/main/java/ru/ifmo/AppConfig.java b/producer/src/main/java/ru/ifmo/AppConfig.java index 1c1062d..af5370e 100644 --- a/producer/src/main/java/ru/ifmo/AppConfig.java +++ b/producer/src/main/java/ru/ifmo/AppConfig.java @@ -1,12 +1,7 @@ package ru.ifmo; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import javax.sql.DataSource; - @Configuration public class AppConfig { } \ No newline at end of file diff --git a/worker/pom.xml b/worker/pom.xml index a95c775..e64b41d 100644 --- a/worker/pom.xml +++ b/worker/pom.xml @@ -42,6 +42,14 @@ springdoc-openapi-starter-webmvc-ui 2.2.0 + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + diff --git a/worker/src/main/java/ru/ifmo/MessagesListener.java b/worker/src/main/java/ru/ifmo/MessagesListener.java index 3eb9484..c5aeda5 100644 --- a/worker/src/main/java/ru/ifmo/MessagesListener.java +++ b/worker/src/main/java/ru/ifmo/MessagesListener.java @@ -1,13 +1,62 @@ package ru.ifmo; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import ru.ifmo.dto.TextTask; +import ru.ifmo.dto.TextProcessingResult; +import ru.ifmo.service.TextProcessingService; @Component +@RequiredArgsConstructor +@Slf4j public class MessagesListener { + private final TextProcessingService textProcessingService; + private final RabbitTemplate rabbitTemplate; + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Value("${rabbitmq.results.exchange.name}") + private String resultsExchange; + + @Value("${rabbitmq.results.routing.key}") + private String resultsRoutingKey; + @RabbitListener(queues = "${rabbitmq.queue.name}") public void receiveMessage(String message) { - System.out.println("Received message: " + message); + log.info("Received message: {}", message); + + try { + // Parse the incoming message as TextTask + TextTask task = objectMapper.readValue(message, TextTask.class); + log.info("Processing task: {} of type: {}", task.getTaskId(), task.getTaskType()); + + // Process the task + TextProcessingResult result = textProcessingService.processTask(task); + + // Send result back to results queue + sendResult(result); + + log.info("Successfully processed task: {}", task.getTaskId()); + + } catch (JsonProcessingException e) { + log.error("Failed to parse message as TextTask: {}", e.getMessage()); + } catch (Exception e) { + log.error("Error processing task: {}", e.getMessage(), e); + } + } + + private void sendResult(TextProcessingResult result) { + try { + rabbitTemplate.convertAndSend(resultsExchange, resultsRoutingKey, result); + log.info("Result sent for task: {}", result.getTaskId()); + } catch (Exception e) { + log.error("Failed to send result for task {}: {}", result.getTaskId(), e.getMessage(), e); + } } } diff --git a/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java index fddf89f..731fae1 100644 --- a/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java +++ b/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -1,9 +1,9 @@ package ru.ifmo.configuration; -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.FanoutExchange; -import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -11,6 +11,17 @@ @Configuration public class RabbitConfig { + public Jackson2JsonMessageConverter messageConverter() { + return new Jackson2JsonMessageConverter(); + } + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setMessageConverter(messageConverter()); + return template; + } + @Bean public FanoutExchange fanoutExchange( @Value("${rabbitmq.exchange.name}") String exchangeName @@ -32,4 +43,27 @@ public Binding binding( ) { return BindingBuilder.bind(queue).to(fanoutExchange); } + + @Bean + public DirectExchange resultsExchange( + @Value("${rabbitmq.results.exchange.name}") String exchangeName + ) { + return new DirectExchange(exchangeName); + } + + @Bean + public Queue resultsQueue( + @Value("${rabbitmq.results.queue.name}") String queueName + ) { + return new Queue(queueName); + } + + @Bean + public Binding resultsBinding( + Queue resultsQueue, + DirectExchange resultsExchange, + @Value("${rabbitmq.results.routing.key}") String routingKey + ) { + return BindingBuilder.bind(resultsQueue).to(resultsExchange).with(routingKey); + } } diff --git a/worker/src/main/java/ru/ifmo/controller/TestController.java b/worker/src/main/java/ru/ifmo/controller/TestController.java new file mode 100644 index 0000000..9bbbd54 --- /dev/null +++ b/worker/src/main/java/ru/ifmo/controller/TestController.java @@ -0,0 +1,35 @@ +package ru.ifmo.controller; + +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.*; +import ru.ifmo.dto.TextTask; +import ru.ifmo.dto.TextProcessingResult; +import ru.ifmo.service.TextProcessingService; + +@RestController +@RequestMapping("/api/test") +@RequiredArgsConstructor +public class TestController { + + private final TextProcessingService textProcessingService; + + @PostMapping("/process") + public TextProcessingResult processText(@RequestBody TextTask task) { + return textProcessingService.processTask(task); + } + + @GetMapping("/demo") + public TextProcessingResult demoProcessing() { + TextTask task = new TextTask(); + task.setTaskId("demo-001"); + task.setTaskType("ALL_TASKS"); + task.setTopN(5); + task.setNameReplacement("[PERSON]"); + task.setText("Hello, my name is John Smith. This is a wonderful day! " + + "I love programming and John enjoys reading books. " + + "Mary said she feels great today. " + + "The weather is terrible but we are happy."); + + return textProcessingService.processTask(task); + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/dto/TextProcessingResult.java b/worker/src/main/java/ru/ifmo/dto/TextProcessingResult.java new file mode 100644 index 0000000..7d80ac7 --- /dev/null +++ b/worker/src/main/java/ru/ifmo/dto/TextProcessingResult.java @@ -0,0 +1,23 @@ +package ru.ifmo.dto; + +import java.util.List; +import java.util.Map; + +import lombok.Data; + +@Data +public class TextProcessingResult { + private String taskId; + private String taskType; + + private Integer wordCount; + + private Map topWords; + + private String sentiment; + private Double sentimentScore; + + private String modifiedText; + + private List sortedSentences; +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/dto/TextTask.java b/worker/src/main/java/ru/ifmo/dto/TextTask.java new file mode 100644 index 0000000..ffd6fc7 --- /dev/null +++ b/worker/src/main/java/ru/ifmo/dto/TextTask.java @@ -0,0 +1,12 @@ +package ru.ifmo.dto; + +import lombok.Data; + +@Data +public class TextTask { + private String taskId; + private String text; + private String taskType; + private int topN; + private String nameReplacement; +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/NameReplacementService.java b/worker/src/main/java/ru/ifmo/service/NameReplacementService.java new file mode 100644 index 0000000..b0f269c --- /dev/null +++ b/worker/src/main/java/ru/ifmo/service/NameReplacementService.java @@ -0,0 +1,84 @@ +package ru.ifmo.service; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.springframework.stereotype.Service; + +@Service +public class NameReplacementService { + + private static final Pattern NAME_PATTERN = Pattern.compile( + "(? { + String prefix = matchResult.group().substring(0, matchResult.start(1) - matchResult.start()); + return prefix + replacement; + }); + + result = QUOTED_NAME_PATTERN.matcher(result) + .replaceAll(matchResult -> "\"" + replacement + "\"" + + matchResult.group().substring(matchResult.end(1) + 1)); + + Matcher nameMatcher = NAME_PATTERN.matcher(result); + StringBuffer sb = new StringBuffer(); + + while (nameMatcher.find()) { + String match = nameMatcher.group(); + if (isPotentialName(match)) { + nameMatcher.appendReplacement(sb, replacement); + } else { + nameMatcher.appendReplacement(sb, match); + } + } + nameMatcher.appendTail(sb); + + return sb.toString(); + } + + private boolean isPotentialName(String word) { + String lowerWord = word.toLowerCase(); + + String[] excludeWords = { + "january", "february", "march", "april", "may", "june", + "july", "august", "september", "october", "november", "december", + "monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday", + "январь", "февраль", "март", "апрель", "май", "июнь", + "июль", "август", "сентябрь", "октябрь", "ноябрь", "декабрь", + "понедельник", "вторник", "среда", "четверг", "пятница", "суббота", "воскресенье", + "the", "this", "that", "these", "those", "when", "where", "why", "how", + "это", "этот", "эта", "эти", "когда", "где", "почему", "как", + "god", "lord", "jesus", "christ", "allah", "buddha", + "бог", "господь", "иисус", "христос", "аллах", "будда", + "america", "europe", "asia", "africa", "australia", + "америка", "европа", "азия", "африка", "австралия" + }; + + for (String exclude : excludeWords) { + if (lowerWord.equals(exclude)) { + return false; + } + } + + return word.length() >= 2 && word.length() <= 20 && word.matches("^[A-ZА-Я][a-zа-я]+$"); + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/SentenceSortingService.java b/worker/src/main/java/ru/ifmo/service/SentenceSortingService.java new file mode 100644 index 0000000..0995b96 --- /dev/null +++ b/worker/src/main/java/ru/ifmo/service/SentenceSortingService.java @@ -0,0 +1,78 @@ +package ru.ifmo.service; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.springframework.stereotype.Service; + +@Service +public class SentenceSortingService { + + private static final Pattern SENTENCE_PATTERN = Pattern.compile( + "(?<=[.!?])\\s+(?=[A-ZА-Я])" + ); + + public List sortSentencesByLength(String text) { + if (text == null || text.trim().isEmpty()) { + return new ArrayList<>(); + } + + List sentences = splitIntoSentences(text); + + return sentences.stream() + .filter(sentence -> !sentence.trim().isEmpty()) + .sorted(Comparator.comparing(String::length)) + .collect(Collectors.toList()); + } + + public List sortSentencesByLengthDescending(String text) { + if (text == null || text.trim().isEmpty()) { + return new ArrayList<>(); + } + + List sentences = splitIntoSentences(text); + + return sentences.stream() + .filter(sentence -> !sentence.trim().isEmpty()) + .sorted(Comparator.comparing(String::length).reversed()) + .collect(Collectors.toList()); + } + + private List splitIntoSentences(String text) { + String cleanText = text.trim().replaceAll("\\s+", " "); + + String[] sentences = SENTENCE_PATTERN.split(cleanText); + + List result = new ArrayList<>(); + for (String sentence : sentences) { + String trimmed = sentence.trim(); + if (!trimmed.isEmpty()) { + if (!trimmed.matches(".*[.!?]$")) { + if (!trimmed.endsWith(".") && !trimmed.endsWith("!") && !trimmed.endsWith("?")) { + trimmed += "."; + } + } + result.add(trimmed); + } + } + + if (result.isEmpty() && !cleanText.isEmpty()) { + String[] fallbackSentences = cleanText.split("(?<=[.!?])\\s+"); + for (String sentence : fallbackSentences) { + String trimmed = sentence.trim(); + if (!trimmed.isEmpty()) { + result.add(trimmed); + } + } + } + + if (result.isEmpty() && !cleanText.isEmpty()) { + result.add(cleanText); + } + + return result; + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/SentimentAnalysisService.java b/worker/src/main/java/ru/ifmo/service/SentimentAnalysisService.java new file mode 100644 index 0000000..83fd51d --- /dev/null +++ b/worker/src/main/java/ru/ifmo/service/SentimentAnalysisService.java @@ -0,0 +1,120 @@ +package ru.ifmo.service; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import jakarta.annotation.PostConstruct; +import org.springframework.core.io.ClassPathResource; +import org.springframework.stereotype.Service; + +@Service +public class SentimentAnalysisService { + + private Set positiveWords; + private Set negativeWords; + + @PostConstruct + public void init() { + positiveWords = loadWordsFromFile("sentiment/positive-words.txt"); + negativeWords = loadWordsFromFile("sentiment/negative-words.txt"); + } + + public String analyzeSentiment(String text) { + if (text == null || text.trim().isEmpty()) { + return "NEUTRAL"; + } + + String[] words = text.toLowerCase() + .replaceAll("[^a-zA-Zа-яА-Я\\s]", " ") + .trim() + .split("\\s+"); + + int positiveCount = 0; + int negativeCount = 0; + + for (String word : words) { + if (positiveWords.contains(word)) { + positiveCount++; + } else if (negativeWords.contains(word)) { + negativeCount++; + } + } + + if (positiveCount > negativeCount) { + return "POSITIVE"; + } else if (negativeCount > positiveCount) { + return "NEGATIVE"; + } else { + return "NEUTRAL"; + } + } + + public double calculateSentimentScore(String text) { + if (text == null || text.trim().isEmpty()) { + return 0.0; + } + + String[] words = text.toLowerCase() + .replaceAll("[^a-zA-Zа-яА-Я\\s]", " ") + .trim() + .split("\\s+"); + + int positiveCount = 0; + int negativeCount = 0; + int totalWords = words.length; + + for (String word : words) { + if (positiveWords.contains(word)) { + positiveCount++; + } else if (negativeWords.contains(word)) { + negativeCount++; + } + } + + return totalWords > 0 ? (double) (positiveCount - negativeCount) / totalWords : 0.0; + } + + private Set loadWordsFromFile(String fileName) { + Set words = new HashSet<>(); + try { + ClassPathResource resource = new ClassPathResource(fileName); + if (resource.exists()) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(resource.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + line = line.trim().toLowerCase(); + if (!line.isEmpty() && !line.startsWith("#")) { + words.add(line); + } + } + } + } + } catch (IOException e) { + System.err.println("Could not load sentiment words from " + fileName + ": " + e.getMessage()); + return getBasicSentimentWords(fileName); + } + return words; + } + + private Set getBasicSentimentWords(String fileName) { + if (fileName.contains("positive")) { + return new HashSet<>(Arrays.asList( + "good", "great", "excellent", "amazing", "wonderful", "fantastic", "awesome", "perfect", + "love", "like", "enjoy", "happy", "pleased", "satisfied", "delighted", "thrilled", + "beautiful", "brilliant", "outstanding", "superb", "magnificent", "terrific", + "хорошо", "отлично", "прекрасно", "замечательно", "великолепно", "чудесно" + )); + } else { + return new HashSet<>(Arrays.asList( + "bad", "terrible", "awful", "horrible", "disgusting", "hate", "dislike", "angry", + "sad", "disappointed", "upset", "annoyed", "frustrated", "furious", "disgusted", + "worst", "pathetic", "useless", "stupid", "ridiculous", "absurd", + "плохо", "ужасно", "отвратительно", "кошмар", "ненавижу", "расстроен" + )); + } + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/TextProcessingService.java b/worker/src/main/java/ru/ifmo/service/TextProcessingService.java new file mode 100644 index 0000000..3515d79 --- /dev/null +++ b/worker/src/main/java/ru/ifmo/service/TextProcessingService.java @@ -0,0 +1,63 @@ +package ru.ifmo.service; + +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import ru.ifmo.dto.TextTask; +import ru.ifmo.dto.TextProcessingResult; + +@Service +@RequiredArgsConstructor +public class TextProcessingService { + + private final WordCountService wordCountService; + private final TopWordsService topWordsService; + private final SentimentAnalysisService sentimentAnalysisService; + private final NameReplacementService nameReplacementService; + private final SentenceSortingService sentenceSortingService; + + public TextProcessingResult processTask(TextTask task) { + TextProcessingResult result = new TextProcessingResult(); + result.setTaskId(task.getTaskId()); + result.setTaskType(task.getTaskType()); + + switch (task.getTaskType().toUpperCase()) { + case "WORD_COUNT": + result.setWordCount(wordCountService.countWords(task.getText())); + break; + + case "TOP_WORDS": + int topN = task.getTopN() > 0 ? task.getTopN() : 10; // Default to 10 + result.setTopWords(topWordsService.findTopWords(task.getText(), topN)); + break; + + case "SENTIMENT_ANALYSIS": + result.setSentiment(sentimentAnalysisService.analyzeSentiment(task.getText())); + result.setSentimentScore(sentimentAnalysisService.calculateSentimentScore(task.getText())); + break; + + case "NAME_REPLACEMENT": + String replacement = task.getNameReplacement() != null ? task.getNameReplacement() : "[NAME]"; + result.setModifiedText(nameReplacementService.replaceNames(task.getText(), replacement)); + break; + + case "SENTENCE_SORTING": + result.setSortedSentences(sentenceSortingService.sortSentencesByLength(task.getText())); + break; + + case "ALL_TASKS": + result.setWordCount(wordCountService.countWords(task.getText())); + result.setTopWords(topWordsService.findTopWords(task.getText(), task.getTopN() > 0 ? task.getTopN() : 5)); + result.setSentiment(sentimentAnalysisService.analyzeSentiment(task.getText())); + result.setSentimentScore(sentimentAnalysisService.calculateSentimentScore(task.getText())); + result.setModifiedText(nameReplacementService.replaceNames(task.getText(), + task.getNameReplacement() != null ? task.getNameReplacement() : "[NAME]")); + result.setSortedSentences(sentenceSortingService.sortSentencesByLength(task.getText())); + break; + + default: + throw new IllegalArgumentException("Unknown task type: " + task.getTaskType()); + } + + return result; + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/TopWordsService.java b/worker/src/main/java/ru/ifmo/service/TopWordsService.java new file mode 100644 index 0000000..d009f59 --- /dev/null +++ b/worker/src/main/java/ru/ifmo/service/TopWordsService.java @@ -0,0 +1,46 @@ +package ru.ifmo.service; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import org.springframework.stereotype.Service; + +@Service +public class TopWordsService { + + public Map findTopWords(String text, int topN) { + if (text == null || text.trim().isEmpty() || topN <= 0) { + return new HashMap<>(); + } + + String[] words = text.toLowerCase() + .replaceAll("[^a-zA-Zа-яА-Я\\s]", " ") // Remove punctuation + .trim() + .split("\\s+"); + + Map wordCount = Arrays.stream(words) + .filter(word -> !word.trim().isEmpty()) + .filter(this::isValidWord) + .collect(Collectors.groupingBy( + word -> word, + Collectors.summingInt(word -> 1) + )); + + return wordCount.entrySet().stream() + .sorted(Map.Entry.comparingByValue().reversed()) + .limit(topN) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e1, + LinkedHashMap::new + )); + } + + private boolean isValidWord(String word) { + return word.length() > 1 && word.matches(".*[a-zA-Zа-яА-Я].*"); + } +} \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/WordCountService.java b/worker/src/main/java/ru/ifmo/service/WordCountService.java new file mode 100644 index 0000000..de8a20b --- /dev/null +++ b/worker/src/main/java/ru/ifmo/service/WordCountService.java @@ -0,0 +1,26 @@ +package ru.ifmo.service; + +import java.util.Arrays; + +import org.springframework.stereotype.Service; + +@Service +public class WordCountService { + + public int countWords(String text) { + if (text == null || text.trim().isEmpty()) { + return 0; + } + + String[] words = text.trim().split("\\s+"); + + return (int) Arrays.stream(words) + .filter(word -> !word.trim().isEmpty()) + .filter(this::isWord) + .count(); + } + + private boolean isWord(String token) { + return token.matches(".*[a-zA-Zа-яА-Я].*"); + } +} \ No newline at end of file diff --git a/worker/src/main/resources/application.properties b/worker/src/main/resources/application.properties index 0a62dc2..1410d7e 100644 --- a/worker/src/main/resources/application.properties +++ b/worker/src/main/resources/application.properties @@ -7,3 +7,8 @@ spring.rabbitmq.password=guest rabbitmq.exchange.name=textTopic rabbitmq.queue.name=workerQueue + +# Results queue configuration +rabbitmq.results.exchange.name=resultsExchange +rabbitmq.results.queue.name=resultsQueue +rabbitmq.results.routing.key=results diff --git a/worker/src/main/resources/sentiment/negative-words.txt b/worker/src/main/resources/sentiment/negative-words.txt new file mode 100644 index 0000000..1347bd0 --- /dev/null +++ b/worker/src/main/resources/sentiment/negative-words.txt @@ -0,0 +1,42 @@ +# Negative words for sentiment analysis +bad +terrible +awful +horrible +disgusting +hate +dislike +angry +sad +disappointed +upset +annoyed +frustrated +furious +disgusted +worst +pathetic +useless +stupid +ridiculous +absurd +dreadful +appalling +atrocious +ghastly +hideous +revolting +repulsive +abominable +detestable +loathsome +плохо +ужасно +отвратительно +кошмар +ненавижу +расстроен +злой +грустно +разочарован +раздражен \ No newline at end of file diff --git a/worker/src/main/resources/sentiment/positive-words.txt b/worker/src/main/resources/sentiment/positive-words.txt new file mode 100644 index 0000000..df337b0 --- /dev/null +++ b/worker/src/main/resources/sentiment/positive-words.txt @@ -0,0 +1,41 @@ +# Positive words for sentiment analysis +good +great +excellent +amazing +wonderful +fantastic +awesome +perfect +love +like +enjoy +happy +pleased +satisfied +delighted +thrilled +beautiful +brilliant +outstanding +superb +magnificent +terrific +splendid +marvelous +incredible +fabulous +spectacular +impressive +remarkable +exceptional +хорошо +отлично +прекрасно +замечательно +великолепно +чудесно +превосходно +восхитительно +блестяще +удивительно \ No newline at end of file From d6326e13619a245a088867a78fad13eb4a9eab16 Mon Sep 17 00:00:00 2001 From: railolog Date: Mon, 24 Nov 2025 22:17:29 +0300 Subject: [PATCH 07/11] change to executing all task types --- .../ru/ifmo/controller/TestController.java | 1 - .../ru/ifmo/dto/TextProcessingResult.java | 10 +--- .../src/main/java/ru/ifmo/dto/TextTask.java | 1 - .../ifmo/service/TextProcessingService.java | 58 +++++-------------- .../java/ru/ifmo/service/TopWordsService.java | 3 +- 5 files changed, 19 insertions(+), 54 deletions(-) diff --git a/worker/src/main/java/ru/ifmo/controller/TestController.java b/worker/src/main/java/ru/ifmo/controller/TestController.java index 9bbbd54..722ae00 100644 --- a/worker/src/main/java/ru/ifmo/controller/TestController.java +++ b/worker/src/main/java/ru/ifmo/controller/TestController.java @@ -22,7 +22,6 @@ public TextProcessingResult processText(@RequestBody TextTask task) { public TextProcessingResult demoProcessing() { TextTask task = new TextTask(); task.setTaskId("demo-001"); - task.setTaskType("ALL_TASKS"); task.setTopN(5); task.setNameReplacement("[PERSON]"); task.setText("Hello, my name is John Smith. This is a wonderful day! " + diff --git a/worker/src/main/java/ru/ifmo/dto/TextProcessingResult.java b/worker/src/main/java/ru/ifmo/dto/TextProcessingResult.java index 7d80ac7..d83331b 100644 --- a/worker/src/main/java/ru/ifmo/dto/TextProcessingResult.java +++ b/worker/src/main/java/ru/ifmo/dto/TextProcessingResult.java @@ -8,16 +8,10 @@ @Data public class TextProcessingResult { private String taskId; - private String taskType; - private Integer wordCount; - private Map topWords; - - private String sentiment; + private String sentiment; // POSITIVE, NEGATIVE, NEUTRAL private Double sentimentScore; - private String modifiedText; - private List sortedSentences; -} \ No newline at end of file +} diff --git a/worker/src/main/java/ru/ifmo/dto/TextTask.java b/worker/src/main/java/ru/ifmo/dto/TextTask.java index ffd6fc7..36a438d 100644 --- a/worker/src/main/java/ru/ifmo/dto/TextTask.java +++ b/worker/src/main/java/ru/ifmo/dto/TextTask.java @@ -6,7 +6,6 @@ public class TextTask { private String taskId; private String text; - private String taskType; private int topN; private String nameReplacement; } \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/TextProcessingService.java b/worker/src/main/java/ru/ifmo/service/TextProcessingService.java index 3515d79..fbc8acc 100644 --- a/worker/src/main/java/ru/ifmo/service/TextProcessingService.java +++ b/worker/src/main/java/ru/ifmo/service/TextProcessingService.java @@ -8,56 +8,30 @@ @Service @RequiredArgsConstructor public class TextProcessingService { - + private final WordCountService wordCountService; private final TopWordsService topWordsService; private final SentimentAnalysisService sentimentAnalysisService; private final NameReplacementService nameReplacementService; private final SentenceSortingService sentenceSortingService; - + public TextProcessingResult processTask(TextTask task) { TextProcessingResult result = new TextProcessingResult(); result.setTaskId(task.getTaskId()); - result.setTaskType(task.getTaskType()); - - switch (task.getTaskType().toUpperCase()) { - case "WORD_COUNT": - result.setWordCount(wordCountService.countWords(task.getText())); - break; - - case "TOP_WORDS": - int topN = task.getTopN() > 0 ? task.getTopN() : 10; // Default to 10 - result.setTopWords(topWordsService.findTopWords(task.getText(), topN)); - break; - - case "SENTIMENT_ANALYSIS": - result.setSentiment(sentimentAnalysisService.analyzeSentiment(task.getText())); - result.setSentimentScore(sentimentAnalysisService.calculateSentimentScore(task.getText())); - break; - - case "NAME_REPLACEMENT": - String replacement = task.getNameReplacement() != null ? task.getNameReplacement() : "[NAME]"; - result.setModifiedText(nameReplacementService.replaceNames(task.getText(), replacement)); - break; - - case "SENTENCE_SORTING": - result.setSortedSentences(sentenceSortingService.sortSentencesByLength(task.getText())); - break; - - case "ALL_TASKS": - result.setWordCount(wordCountService.countWords(task.getText())); - result.setTopWords(topWordsService.findTopWords(task.getText(), task.getTopN() > 0 ? task.getTopN() : 5)); - result.setSentiment(sentimentAnalysisService.analyzeSentiment(task.getText())); - result.setSentimentScore(sentimentAnalysisService.calculateSentimentScore(task.getText())); - result.setModifiedText(nameReplacementService.replaceNames(task.getText(), - task.getNameReplacement() != null ? task.getNameReplacement() : "[NAME]")); - result.setSortedSentences(sentenceSortingService.sortSentencesByLength(task.getText())); - break; - - default: - throw new IllegalArgumentException("Unknown task type: " + task.getTaskType()); - } - + + result.setWordCount(wordCountService.countWords(task.getText())); + + int topN = task.getTopN() > 0 ? task.getTopN() : 5; + result.setTopWords(topWordsService.findTopWords(task.getText(), topN)); + + result.setSentiment(sentimentAnalysisService.analyzeSentiment(task.getText())); + result.setSentimentScore(sentimentAnalysisService.calculateSentimentScore(task.getText())); + + String replacement = task.getNameReplacement() != null ? task.getNameReplacement() : "[NAME]"; + result.setModifiedText(nameReplacementService.replaceNames(task.getText(), replacement)); + + result.setSortedSentences(sentenceSortingService.sortSentencesByLength(task.getText())); + return result; } } \ No newline at end of file diff --git a/worker/src/main/java/ru/ifmo/service/TopWordsService.java b/worker/src/main/java/ru/ifmo/service/TopWordsService.java index d009f59..6283782 100644 --- a/worker/src/main/java/ru/ifmo/service/TopWordsService.java +++ b/worker/src/main/java/ru/ifmo/service/TopWordsService.java @@ -31,11 +31,10 @@ public Map findTopWords(String text, int topN) { return wordCount.entrySet().stream() .sorted(Map.Entry.comparingByValue().reversed()) - .limit(topN) .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue, - (e1, e2) -> e1, + Math::max, LinkedHashMap::new )); } From 3a5baf5d8de3c3ad40cb52a2583303d3f0a1c648 Mon Sep 17 00:00:00 2001 From: railolog Date: Mon, 24 Nov 2025 22:24:23 +0300 Subject: [PATCH 08/11] init aggregator --- aggregator/pom.xml | 74 +++++++++++++++++++ aggregator/src/main/java/ru/ifmo/App.java | 11 +++ .../src/main/java/ru/ifmo/AppConfig.java | 7 ++ .../ru/ifmo/configuration/RabbitConfig.java | 8 ++ .../src/main/resources/application.properties | 15 ++++ producer/pom.xml | 17 +++-- 6 files changed, 124 insertions(+), 8 deletions(-) create mode 100644 aggregator/pom.xml create mode 100644 aggregator/src/main/java/ru/ifmo/App.java create mode 100644 aggregator/src/main/java/ru/ifmo/AppConfig.java create mode 100644 aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java create mode 100644 aggregator/src/main/resources/application.properties diff --git a/aggregator/pom.xml b/aggregator/pom.xml new file mode 100644 index 0000000..dd0558f --- /dev/null +++ b/aggregator/pom.xml @@ -0,0 +1,74 @@ + + 4.0.0 + + ru.ifmo + aggregator + 1.0 + jar + + Aggregator + + + org.springframework.boot + spring-boot-starter-parent + 3.4.12 + + + + UTF-8 + ru.ifmo.App + 21 + 1.18.30 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-amqp + + + org.projectlombok + lombok + ${lombok.version} + provided + + + org.springdoc + springdoc-openapi-starter-webmvc-ui + 2.2.0 + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + --add-opens java.base/java.lang=ALL-UNNAMED + + + + org.springframework.boot + spring-boot-maven-plugin + + + --add-opens java.base/java.lang=ALL-UNNAMED + --add-opens java.base/java.util=ALL-UNNAMED + + + + + + + + + + + + + diff --git a/aggregator/src/main/java/ru/ifmo/App.java b/aggregator/src/main/java/ru/ifmo/App.java new file mode 100644 index 0000000..942563c --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/App.java @@ -0,0 +1,11 @@ +package ru.ifmo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class App { + public static void main(String[] args) { + SpringApplication.run(App.class, args); + } +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/AppConfig.java b/aggregator/src/main/java/ru/ifmo/AppConfig.java new file mode 100644 index 0000000..af5370e --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/AppConfig.java @@ -0,0 +1,7 @@ +package ru.ifmo; + +import org.springframework.context.annotation.Configuration; + +@Configuration +public class AppConfig { +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java new file mode 100644 index 0000000..7066084 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -0,0 +1,8 @@ +package ru.ifmo.configuration; + +import org.springframework.context.annotation.Configuration; + +@Configuration +public class RabbitConfig { + +} diff --git a/aggregator/src/main/resources/application.properties b/aggregator/src/main/resources/application.properties new file mode 100644 index 0000000..e00bb1d --- /dev/null +++ b/aggregator/src/main/resources/application.properties @@ -0,0 +1,15 @@ +server.port=${PORT:8079} + +spring.rabbitmq.host=localhost +spring.rabbitmq.port=5672 +spring.rabbitmq.username=guest +spring.rabbitmq.password=guest + +rabbitmq.exchange.name=textTopic + +# Swagger/OpenAPI Configuration +springdoc.api-docs.path=/api-docs +springdoc.swagger-ui.path=/swagger-ui.html +springdoc.swagger-ui.operationsSorter=method +springdoc.swagger-ui.tagsSorter=alpha +springdoc.swagger-ui.tryItOutEnabled=true diff --git a/producer/pom.xml b/producer/pom.xml index 2c1aed0..a5e3c4c 100644 --- a/producer/pom.xml +++ b/producer/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 ru.ifmo @@ -61,13 +62,13 @@ --add-opens java.base/java.util=ALL-UNNAMED - - - - - - - + + + + + + + From bbb0eb136d65c3ae91e73322feb3d46e7427567c Mon Sep 17 00:00:00 2001 From: railolog Date: Tue, 25 Nov 2025 00:13:27 +0300 Subject: [PATCH 09/11] implement aggregator --- aggregator/pom.xml | 8 + .../ru/ifmo/configuration/RabbitConfig.java | 86 ++++++++ .../ifmo/controller/AggregatorController.java | 102 +++++++++ .../java/ru/ifmo/dto/AggregatedResult.java | 30 +++ .../main/java/ru/ifmo/dto/SectionResult.java | 16 ++ .../main/java/ru/ifmo/dto/SessionInfo.java | 10 + .../ru/ifmo/dto/TextProcessingResult.java | 17 ++ .../ru/ifmo/listener/ResultsListener.java | 101 +++++++++ .../ru/ifmo/listener/SessionInfoListener.java | 43 ++++ .../ru/ifmo/service/AggregationService.java | 208 ++++++++++++++++++ .../ru/ifmo/service/ResultStorageService.java | 157 +++++++++++++ .../src/main/resources/application.properties | 15 +- 12 files changed, 792 insertions(+), 1 deletion(-) create mode 100644 aggregator/src/main/java/ru/ifmo/controller/AggregatorController.java create mode 100644 aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java create mode 100644 aggregator/src/main/java/ru/ifmo/dto/SectionResult.java create mode 100644 aggregator/src/main/java/ru/ifmo/dto/SessionInfo.java create mode 100644 aggregator/src/main/java/ru/ifmo/dto/TextProcessingResult.java create mode 100644 aggregator/src/main/java/ru/ifmo/listener/ResultsListener.java create mode 100644 aggregator/src/main/java/ru/ifmo/listener/SessionInfoListener.java create mode 100644 aggregator/src/main/java/ru/ifmo/service/AggregationService.java create mode 100644 aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java diff --git a/aggregator/pom.xml b/aggregator/pom.xml index dd0558f..3479cbd 100644 --- a/aggregator/pom.xml +++ b/aggregator/pom.xml @@ -42,6 +42,14 @@ springdoc-openapi-starter-webmvc-ui 2.2.0 + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + diff --git a/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java index 7066084..acb2977 100644 --- a/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java +++ b/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -1,8 +1,94 @@ package ru.ifmo.configuration; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { + @Bean + public Jackson2JsonMessageConverter messageConverter() { + return new Jackson2JsonMessageConverter(); + } + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setMessageConverter(messageConverter()); + return template; + } + + @Bean + public DirectExchange resultsExchange( + @Value("${rabbitmq.results.exchange.name}") String exchangeName + ) { + return new DirectExchange(exchangeName); + } + + @Bean + public Queue resultsQueue( + @Value("${rabbitmq.results.queue.name}") String queueName + ) { + return new Queue(queueName); + } + + @Bean + public Binding resultsBinding( + Queue resultsQueue, + DirectExchange resultsExchange, + @Value("${rabbitmq.results.routing.key}") String routingKey + ) { + return BindingBuilder.bind(resultsQueue).to(resultsExchange).with(routingKey); + } + + @Bean + public DirectExchange sessionInfoExchange( + @Value("${rabbitmq.session.exchange.name}") String exchangeName + ) { + return new DirectExchange(exchangeName); + } + + @Bean + public Queue sessionInfoQueue( + @Value("${rabbitmq.session.queue.name}") String queueName + ) { + return new Queue(queueName); + } + + @Bean + public Binding sessionInfoBinding( + Queue sessionInfoQueue, + DirectExchange sessionInfoExchange, + @Value("${rabbitmq.session.routing.key}") String routingKey + ) { + return BindingBuilder.bind(sessionInfoQueue).to(sessionInfoExchange).with(routingKey); + } + + @Bean + public DirectExchange finalResultsExchange( + @Value("${rabbitmq.final.exchange.name}") String exchangeName + ) { + return new DirectExchange(exchangeName); + } + + @Bean + public Queue finalResultsQueue( + @Value("${rabbitmq.final.queue.name}") String queueName + ) { + return new Queue(queueName); + } + + @Bean + public Binding finalResultsBinding( + Queue finalResultsQueue, + DirectExchange finalResultsExchange, + @Value("${rabbitmq.final.routing.key}") String routingKey + ) { + return BindingBuilder.bind(finalResultsQueue).to(finalResultsExchange).with(routingKey); + } } diff --git a/aggregator/src/main/java/ru/ifmo/controller/AggregatorController.java b/aggregator/src/main/java/ru/ifmo/controller/AggregatorController.java new file mode 100644 index 0000000..f9f04a3 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/controller/AggregatorController.java @@ -0,0 +1,102 @@ +package ru.ifmo.controller; + +import lombok.RequiredArgsConstructor; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; +import ru.ifmo.dto.AggregatedResult; +import ru.ifmo.service.AggregationService; +import ru.ifmo.service.ResultStorageService; + +import java.util.List; + +@RestController +@RequestMapping("/api/aggregator") +@RequiredArgsConstructor +public class AggregatorController { + + private final AggregationService aggregationService; + private final ResultStorageService resultStorageService; + + @GetMapping("/sessions") + public ResponseEntity> getActiveSessions() { + List sessions = aggregationService.getActiveSessions(); + return ResponseEntity.ok(sessions); + } + + @GetMapping("/sessions/{sessionId}/count") + public ResponseEntity getResultCount(@PathVariable String sessionId) { + int count = aggregationService.getResultCount(sessionId); + return ResponseEntity.ok(count); + } + + @PostMapping("/sessions/{sessionId}/expected/{expectedCount}") + public ResponseEntity setExpectedTaskCount( + @PathVariable String sessionId, + @PathVariable int expectedCount) { + aggregationService.setExpectedTaskCount(sessionId, expectedCount); + return ResponseEntity.ok().build(); + } + + @GetMapping("/sessions/{sessionId}/ready") + public ResponseEntity isSessionReady(@PathVariable String sessionId) { + boolean ready = aggregationService.isReadyForAggregation(sessionId); + return ResponseEntity.ok(ready); + } + + @PostMapping("/sessions/{sessionId}/aggregate") + public ResponseEntity triggerAggregation(@PathVariable String sessionId) { + AggregatedResult result = aggregationService.aggregateResults(sessionId); + if (result != null) { + resultStorageService.storeResult(result); + return ResponseEntity.ok(result); + } + return ResponseEntity.notFound().build(); + } + + @DeleteMapping("/sessions/{sessionId}") + public ResponseEntity clearSession(@PathVariable String sessionId) { + aggregationService.clearSession(sessionId); + return ResponseEntity.ok().build(); + } + + @GetMapping("/results") + public ResponseEntity> getStoredResultIds() { + List resultIds = resultStorageService.getStoredResultIds(); + return ResponseEntity.ok(resultIds); + } + + @GetMapping("/results/{aggregationId}") + public ResponseEntity getResult(@PathVariable String aggregationId) { + AggregatedResult result = resultStorageService.getResult(aggregationId); + if (result != null) { + return ResponseEntity.ok(result); + } + return ResponseEntity.notFound().build(); + } + + @GetMapping("/results/all") + public ResponseEntity> getAllResults() { + List results = resultStorageService.getAllResults(); + return ResponseEntity.ok(results); + } + + @DeleteMapping("/results/{aggregationId}") + public ResponseEntity deleteResult(@PathVariable String aggregationId) { + boolean deleted = resultStorageService.deleteResult(aggregationId); + if (deleted) { + return ResponseEntity.ok().build(); + } + return ResponseEntity.notFound().build(); + } + + @DeleteMapping("/results") + public ResponseEntity clearAllResults() { + resultStorageService.clearAllResults(); + return ResponseEntity.ok().build(); + } + + @GetMapping("/health") + public ResponseEntity health() { + return ResponseEntity.ok("Aggregator is running"); + } +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java b/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java new file mode 100644 index 0000000..4bdd073 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java @@ -0,0 +1,30 @@ +package ru.ifmo.dto; + +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; + +import lombok.Data; + +@Data +public class AggregatedResult { + private String aggregationId; + private LocalDateTime timestamp; + private int totalSections; + private List processedTaskIds; + + private Long totalWordCount; + + private Map mergedTopWords; + + private String overallSentiment; // POSITIVE, NEGATIVE, NEUTRAL + private Double averageSentimentScore; + private Map sentimentDistribution; + + private List modifiedTextSections; + private String combinedModifiedText; + + private List allSortedSentences; + + private List sectionResults; +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/dto/SectionResult.java b/aggregator/src/main/java/ru/ifmo/dto/SectionResult.java new file mode 100644 index 0000000..c06cab5 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/dto/SectionResult.java @@ -0,0 +1,16 @@ +package ru.ifmo.dto; + +import lombok.Data; +import java.util.List; +import java.util.Map; + +@Data +public class SectionResult { + private String taskId; + private Integer wordCount; + private Map topWords; + private String sentiment; + private Double sentimentScore; + private String modifiedText; + private List sortedSentences; +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/dto/SessionInfo.java b/aggregator/src/main/java/ru/ifmo/dto/SessionInfo.java new file mode 100644 index 0000000..039c0a1 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/dto/SessionInfo.java @@ -0,0 +1,10 @@ +package ru.ifmo.dto; + +import lombok.Data; + +@Data +public class SessionInfo { + private String sessionId; + private int expectedTaskCount; + private String description; +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/dto/TextProcessingResult.java b/aggregator/src/main/java/ru/ifmo/dto/TextProcessingResult.java new file mode 100644 index 0000000..c0d7f24 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/dto/TextProcessingResult.java @@ -0,0 +1,17 @@ +package ru.ifmo.dto; + +import java.util.List; +import java.util.Map; + +import lombok.Data; + +@Data +public class TextProcessingResult { + private String taskId; + private Integer wordCount; + private Map topWords; + private String sentiment; // POSITIVE, NEGATIVE, NEUTRAL + private Double sentimentScore; + private String modifiedText; + private List sortedSentences; +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/listener/ResultsListener.java b/aggregator/src/main/java/ru/ifmo/listener/ResultsListener.java new file mode 100644 index 0000000..3908093 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/listener/ResultsListener.java @@ -0,0 +1,101 @@ +package ru.ifmo.listener; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import ru.ifmo.dto.AggregatedResult; +import ru.ifmo.dto.TextProcessingResult; +import ru.ifmo.service.AggregationService; +import ru.ifmo.service.ResultStorageService; + +@Component +@RequiredArgsConstructor +@Slf4j +public class ResultsListener { + + private final AggregationService aggregationService; + private final ResultStorageService resultStorageService; + private final RabbitTemplate rabbitTemplate; + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Value("${rabbitmq.final.exchange.name}") + private String finalExchange; + + @Value("${rabbitmq.final.routing.key}") + private String finalRoutingKey; + + @RabbitListener(queues = "${rabbitmq.results.queue.name}") + public void receiveResult(String message) { + log.info("Received result message: {}", message); + + try { + TextProcessingResult result = objectMapper.readValue(message, TextProcessingResult.class); + log.info("Processing result for task: {}", result.getTaskId()); + + String sessionId = extractSessionId(result.getTaskId()); + + aggregationService.addResult(sessionId, result); + + checkAndTriggerAggregation(sessionId); + + } catch (JsonProcessingException e) { + log.error("Failed to parse message as TextProcessingResult: {}", e.getMessage()); + } catch (Exception e) { + log.error("Error processing result: {}", e.getMessage(), e); + } + } + + private String extractSessionId(String taskId) { + // taskId format "session-001-task-001" + if (taskId != null && taskId.contains("-")) { + String[] parts = taskId.split("-"); + if (parts.length >= 2) { + return parts[0] + "-" + parts[1]; + } + } + return "default-session"; + } + + private void checkAndTriggerAggregation(String sessionId) { + if (aggregationService.isReadyForAggregation(sessionId)) { + log.info("Session {} is ready for aggregation, triggering...", sessionId); + triggerAggregation(sessionId); + } else { + int resultCount = aggregationService.getResultCount(sessionId); + log.info("Session {} not ready yet: {} results received", sessionId, resultCount); + } + } + + private void triggerAggregation(String sessionId) { + try { + log.info("Triggering aggregation for session: {}", sessionId); + + AggregatedResult aggregatedResult = aggregationService.aggregateResults(sessionId); + if (aggregatedResult != null) { + resultStorageService.storeResult(aggregatedResult); + + publishFinalResult(aggregatedResult); + + log.info("Aggregation completed and published for session: {}", sessionId); + } + + } catch (Exception e) { + log.error("Error during aggregation for session {}: {}", sessionId, e.getMessage(), e); + } + } + + private void publishFinalResult(AggregatedResult result) { + try { + rabbitTemplate.convertAndSend(finalExchange, finalRoutingKey, result); + log.info("Final result published for aggregation: {}", result.getAggregationId()); + } catch (Exception e) { + log.error("Failed to publish final result for aggregation {}: {}", + result.getAggregationId(), e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/listener/SessionInfoListener.java b/aggregator/src/main/java/ru/ifmo/listener/SessionInfoListener.java new file mode 100644 index 0000000..397322a --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/listener/SessionInfoListener.java @@ -0,0 +1,43 @@ +package ru.ifmo.listener; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; +import ru.ifmo.dto.SessionInfo; +import ru.ifmo.service.AggregationService; + +@Component +@RequiredArgsConstructor +@Slf4j +public class SessionInfoListener { + + private final AggregationService aggregationService; + private final ObjectMapper objectMapper = new ObjectMapper(); + + @RabbitListener(queues = "${rabbitmq.session.queue.name}") + public void receiveSessionInfo(String message) { + log.info("Received session info message: {}", message); + + try { + SessionInfo sessionInfo = objectMapper.readValue(message, SessionInfo.class); + log.info("Processing session info: {} with expected {} tasks", + sessionInfo.getSessionId(), sessionInfo.getExpectedTaskCount()); + + aggregationService.setExpectedTaskCount( + sessionInfo.getSessionId(), + sessionInfo.getExpectedTaskCount() + ); + + log.info("Successfully registered session {} with {} expected tasks", + sessionInfo.getSessionId(), sessionInfo.getExpectedTaskCount()); + + } catch (JsonProcessingException e) { + log.error("Failed to parse message as SessionInfo: {}", e.getMessage()); + } catch (Exception e) { + log.error("Error processing session info: {}", e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/service/AggregationService.java b/aggregator/src/main/java/ru/ifmo/service/AggregationService.java new file mode 100644 index 0000000..f02b58b --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/service/AggregationService.java @@ -0,0 +1,208 @@ +package ru.ifmo.service; + +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import ru.ifmo.dto.AggregatedResult; +import ru.ifmo.dto.SectionResult; +import ru.ifmo.dto.TextProcessingResult; + +@Service +@Slf4j +public class AggregationService { + + private final Map> aggregationSessions = new ConcurrentHashMap<>(); + + private final Map expectedTaskCounts = new ConcurrentHashMap<>(); + + public void setExpectedTaskCount(String sessionId, int expectedCount) { + expectedTaskCounts.put(sessionId, expectedCount); + log.info("Set expected task count for session {}: {}", sessionId, expectedCount); + } + + public void addResult(String sessionId, TextProcessingResult result) { + aggregationSessions.computeIfAbsent(sessionId, k -> new ArrayList<>()).add(result); + log.info("Added result for task {} to session {} ({}/{})", + result.getTaskId(), sessionId, + getResultCount(sessionId), + expectedTaskCounts.get(sessionId)); + } + + public boolean isReadyForAggregation(String sessionId) { + Integer expectedCount = expectedTaskCounts.get(sessionId); + if (expectedCount == null) { + log.warn("No expected count set for session {}", sessionId); + return false; + } + + int currentCount = getResultCount(sessionId); + boolean ready = currentCount >= expectedCount; + + if (ready) { + log.info("Session {} is ready for aggregation: {}/{} tasks completed", + sessionId, currentCount, expectedCount); + } + + return ready; + } + + public AggregatedResult aggregateResults(String sessionId) { + List results = aggregationSessions.get(sessionId); + if (results == null || results.isEmpty()) { + log.warn("No results found for session {}", sessionId); + return null; + } + + log.info("Aggregating {} results for session {}", results.size(), sessionId); + + AggregatedResult aggregated = new AggregatedResult(); + aggregated.setAggregationId(sessionId); + aggregated.setTimestamp(LocalDateTime.now()); + aggregated.setTotalSections(results.size()); + aggregated.setProcessedTaskIds(results.stream() + .map(TextProcessingResult::getTaskId) + .collect(Collectors.toList())); + + aggregated.setTotalWordCount(aggregateWordCounts(results)); + + aggregated.setMergedTopWords(mergeTopWords(results)); + + aggregateSentiment(results, aggregated); + + aggregateModifiedText(results, aggregated); + + aggregated.setAllSortedSentences(aggregateSortedSentences(results)); + + aggregated.setSectionResults(createSectionResults(results)); + + log.info("Aggregation completed for session {}", sessionId); + return aggregated; + } + + private Long aggregateWordCounts(List results) { + return results.stream() + .filter(r -> r.getWordCount() != null) + .mapToLong(r -> r.getWordCount().longValue()) + .sum(); + } + + private Map mergeTopWords(List results) { + Map mergedWords = new HashMap<>(); + + for (TextProcessingResult result : results) { + if (result.getTopWords() != null) { + for (Map.Entry entry : result.getTopWords().entrySet()) { + mergedWords.merge(entry.getKey(), entry.getValue(), Integer::sum); + } + } + } + + return mergedWords.entrySet().stream() + .sorted(Map.Entry.comparingByValue().reversed()) + .collect(Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e1, + LinkedHashMap::new + )); + } + + private void aggregateSentiment(List results, AggregatedResult aggregated) { + List sentiments = results.stream() + .map(TextProcessingResult::getSentiment) + .filter(Objects::nonNull) + .toList(); + + List scores = results.stream() + .map(TextProcessingResult::getSentimentScore) + .filter(Objects::nonNull) + .toList(); + + Map distribution = sentiments.stream() + .collect(Collectors.groupingBy( + sentiment -> sentiment, + Collectors.summingInt(sentiment -> 1) + )); + aggregated.setSentimentDistribution(distribution); + + if (!scores.isEmpty()) { + double avgScore = scores.stream().mapToDouble(Double::doubleValue).average().orElse(0.0); + aggregated.setAverageSentimentScore(avgScore); + } + + String overallSentiment = distribution.entrySet().stream() + .max(Map.Entry.comparingByValue()) + .map(Map.Entry::getKey) + .orElse("NEUTRAL"); + aggregated.setOverallSentiment(overallSentiment); + } + + private void aggregateModifiedText(List results, AggregatedResult aggregated) { + List modifiedSections = results.stream() + .map(TextProcessingResult::getModifiedText) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + aggregated.setModifiedTextSections(modifiedSections); + + String combinedText = String.join("\n\n", modifiedSections); + aggregated.setCombinedModifiedText(combinedText); + } + + private List aggregateSortedSentences(List results) { + List allSentences = new ArrayList<>(); + + for (TextProcessingResult result : results) { + if (result.getSortedSentences() != null) { + allSentences.addAll(result.getSortedSentences()); + } + } + + return allSentences.stream() + .sorted(Comparator.comparing(String::length)) + .collect(Collectors.toList()); + } + + private List createSectionResults(List results) { + return results.stream() + .map(this::convertToSectionResult) + .collect(Collectors.toList()); + } + + private SectionResult convertToSectionResult(TextProcessingResult result) { + SectionResult section = new SectionResult(); + section.setTaskId(result.getTaskId()); + section.setWordCount(result.getWordCount()); + section.setTopWords(result.getTopWords()); + section.setSentiment(result.getSentiment()); + section.setSentimentScore(result.getSentimentScore()); + section.setModifiedText(result.getModifiedText()); + section.setSortedSentences(result.getSortedSentences()); + return section; + } + + public void clearSession(String sessionId) { + aggregationSessions.remove(sessionId); + expectedTaskCounts.remove(sessionId); + log.info("Cleared session {}", sessionId); + } + + public List getActiveSessions() { + return new ArrayList<>(aggregationSessions.keySet()); + } + + public int getResultCount(String sessionId) { + List results = aggregationSessions.get(sessionId); + return results != null ? results.size() : 0; + } +} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java b/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java new file mode 100644 index 0000000..3ca3071 --- /dev/null +++ b/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java @@ -0,0 +1,157 @@ +package ru.ifmo.service; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import ru.ifmo.dto.AggregatedResult; + +@Service +@Slf4j +public class ResultStorageService { + + private final ObjectMapper objectMapper = new ObjectMapper(); + private final ConcurrentMap storedResults = new ConcurrentHashMap<>(); + private final String resultsDirectory = "results"; + + public ResultStorageService() { + try { + Path resultsPath = Paths.get(resultsDirectory); + if (!Files.exists(resultsPath)) { + Files.createDirectories(resultsPath); + log.info("Created results directory: {}", resultsPath.toAbsolutePath()); + } + } catch (IOException e) { + log.error("Failed to create results directory: {}", e.getMessage()); + } + } + + public void storeResult(AggregatedResult result) { + try { + storedResults.put(result.getAggregationId(), result); + + saveToFile(result); + + log.info("Stored aggregated result for session: {}", result.getAggregationId()); + + } catch (Exception e) { + log.error("Failed to store result for session {}: {}", + result.getAggregationId(), e.getMessage(), e); + } + } + + private void saveToFile(AggregatedResult result) throws IOException { + String timestamp = result.getTimestamp().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")); + String filename = String.format("%s_%s.json", result.getAggregationId(), timestamp); + Path filePath = Paths.get(resultsDirectory, filename); + + objectMapper.writerWithDefaultPrettyPrinter().writeValue(filePath.toFile(), result); + log.info("Saved result to file: {}", filePath.toAbsolutePath()); + + saveSummaryReport(result, timestamp); + } + + private void saveSummaryReport(AggregatedResult result, String timestamp) throws IOException { + String filename = String.format("%s_%s_summary.txt", result.getAggregationId(), timestamp); + Path filePath = Paths.get(resultsDirectory, filename); + + StringBuilder report = new StringBuilder(); + report.append("=== TEXT PROCESSING AGGREGATION REPORT ===\n"); + report.append("Aggregation ID: ").append(result.getAggregationId()).append("\n"); + report.append("Timestamp: ").append(result.getTimestamp()).append("\n"); + report.append("Total Sections Processed: ").append(result.getTotalSections()).append("\n"); + report.append("Processed Task IDs: ").append(String.join(", ", result.getProcessedTaskIds())).append("\n\n"); + + report.append("=== WORD COUNT SUMMARY ===\n"); + report.append("Total Words: ").append(result.getTotalWordCount()).append("\n\n"); + + report.append("=== TOP WORDS SUMMARY ===\n"); + if (result.getMergedTopWords() != null) { + result.getMergedTopWords().entrySet().stream() + .limit(10) + .forEach(entry -> report.append(String.format("%-20s: %d\n", entry.getKey(), entry.getValue()))); + } + report.append("\n"); + + report.append("=== SENTIMENT ANALYSIS SUMMARY ===\n"); + report.append("Overall Sentiment: ").append(result.getOverallSentiment()).append("\n"); + report.append("Average Sentiment Score: ").append(String.format("%.3f", result.getAverageSentimentScore())).append("\n"); + if (result.getSentimentDistribution() != null) { + report.append("Sentiment Distribution:\n"); + result.getSentimentDistribution().forEach((sentiment, count) -> + report.append(String.format(" %s: %d sections\n", sentiment, count))); + } + report.append("\n"); + + report.append("=== TEXT MODIFICATION SUMMARY ===\n"); + report.append("Modified Text Sections: ").append(result.getModifiedTextSections() != null ? + result.getModifiedTextSections().size() : 0).append("\n"); + if (result.getCombinedModifiedText() != null) { + report.append("Combined Text Length: ").append(result.getCombinedModifiedText().length()).append(" " + + "characters\n"); + } + report.append("\n"); + + report.append("=== SENTENCE SORTING SUMMARY ===\n"); + report.append("Total Sorted Sentences: ").append(result.getAllSortedSentences() != null ? + result.getAllSortedSentences().size() : 0).append("\n"); + if (result.getAllSortedSentences() != null && !result.getAllSortedSentences().isEmpty()) { + report.append("Shortest Sentence: ").append(result.getAllSortedSentences().getFirst()).append("\n"); + report.append("Longest Sentence: ").append(result.getAllSortedSentences().getLast()).append("\n"); + } + report.append("\n"); + + report.append("=== PER-SECTION SUMMARY ===\n"); + if (result.getSectionResults() != null) { + result.getSectionResults().forEach(section -> { + report.append(String.format("Task ID: %s\n", section.getTaskId())); + report.append(String.format(" Word Count: %d\n", section.getWordCount())); + report.append(String.format(" Sentiment: %s (%.3f)\n", section.getSentiment(), + section.getSentimentScore())); + report.append(String.format(" Top Words: %d\n", section.getTopWords() != null ? + section.getTopWords().size() : 0)); + report.append(String.format(" Sentences: %d\n", section.getSortedSentences() != null ? + section.getSortedSentences().size() : 0)); + report.append("\n"); + }); + } + + Files.write(filePath, report.toString().getBytes()); + log.info("Saved summary report to file: {}", filePath.toAbsolutePath()); + } + + public AggregatedResult getResult(String aggregationId) { + return storedResults.get(aggregationId); + } + + public List getAllResults() { + return new ArrayList<>(storedResults.values()); + } + + public List getStoredResultIds() { + return new ArrayList<>(storedResults.keySet()); + } + + public boolean deleteResult(String aggregationId) { + AggregatedResult removed = storedResults.remove(aggregationId); + if (removed != null) { + log.info("Deleted result for aggregation: {}", aggregationId); + return true; + } + return false; + } + + public void clearAllResults() { + storedResults.clear(); + log.info("Cleared all stored results"); + } +} \ No newline at end of file diff --git a/aggregator/src/main/resources/application.properties b/aggregator/src/main/resources/application.properties index e00bb1d..870a4a6 100644 --- a/aggregator/src/main/resources/application.properties +++ b/aggregator/src/main/resources/application.properties @@ -5,7 +5,20 @@ spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest -rabbitmq.exchange.name=textTopic +# Results queue configuration (listening to worker results) +rabbitmq.results.exchange.name=resultsExchange +rabbitmq.results.queue.name=resultsQueue +rabbitmq.results.routing.key=results + +# Session info queue configuration (receiving expected task counts) +rabbitmq.session.exchange.name=sessionInfoExchange +rabbitmq.session.queue.name=sessionInfoQueue +rabbitmq.session.routing.key=session + +# Final results queue configuration (publishing aggregated results) +rabbitmq.final.exchange.name=finalResultsExchange +rabbitmq.final.queue.name=finalResultsQueue +rabbitmq.final.routing.key=final # Swagger/OpenAPI Configuration springdoc.api-docs.path=/api-docs From 21fecd9226257a44158aa1b8a46accb4b9e2563a Mon Sep 17 00:00:00 2001 From: railolog Date: Tue, 23 Dec 2025 01:55:24 +0300 Subject: [PATCH 10/11] polish implementation --- .gitignore | 2 + .../ru/ifmo/configuration/RabbitConfig.java | 1 - .../ifmo/controller/AggregatorController.java | 102 ---------- .../java/ru/ifmo/dto/AggregatedResult.java | 3 +- .../ru/ifmo/service/AggregationService.java | 3 +- .../ru/ifmo/service/ResultStorageService.java | 9 +- producer/pom.xml | 8 + .../main/java/ru/ifmo/HelloController.java | 50 ----- .../ru/ifmo/configuration/RabbitConfig.java | 28 ++- .../main/java/ru/ifmo/dto/SessionInfo.java | 10 + .../src/main/java/ru/ifmo/dto/TextTask.java | 11 ++ .../ifmo/service/MessagePublisherService.java | 80 ++++++++ .../service/StartupTextProcessingService.java | 58 ++++++ .../service/TextProcessingJobService.java | 119 ++++++++++++ .../ru/ifmo/service/TextSplitterService.java | 178 ++++++++++++++++++ .../src/main/resources/application.properties | 12 +- .../main/java/ru/ifmo/MessagesListener.java | 12 +- .../ru/ifmo/controller/TestController.java | 34 ---- 18 files changed, 514 insertions(+), 206 deletions(-) delete mode 100644 aggregator/src/main/java/ru/ifmo/controller/AggregatorController.java delete mode 100644 producer/src/main/java/ru/ifmo/HelloController.java create mode 100644 producer/src/main/java/ru/ifmo/dto/SessionInfo.java create mode 100644 producer/src/main/java/ru/ifmo/dto/TextTask.java create mode 100644 producer/src/main/java/ru/ifmo/service/MessagePublisherService.java create mode 100644 producer/src/main/java/ru/ifmo/service/StartupTextProcessingService.java create mode 100644 producer/src/main/java/ru/ifmo/service/TextProcessingJobService.java create mode 100644 producer/src/main/java/ru/ifmo/service/TextSplitterService.java delete mode 100644 worker/src/main/java/ru/ifmo/controller/TestController.java diff --git a/.gitignore b/.gitignore index 0a28c18..55c7668 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ /.idea/yatool.xml .idea/ /worker/target/ +/aggregator/target/ +/producer/src/main/resources/sample-text.txt diff --git a/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java index acb2977..2e91291 100644 --- a/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java +++ b/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -11,7 +11,6 @@ @Configuration public class RabbitConfig { - @Bean public Jackson2JsonMessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } diff --git a/aggregator/src/main/java/ru/ifmo/controller/AggregatorController.java b/aggregator/src/main/java/ru/ifmo/controller/AggregatorController.java deleted file mode 100644 index f9f04a3..0000000 --- a/aggregator/src/main/java/ru/ifmo/controller/AggregatorController.java +++ /dev/null @@ -1,102 +0,0 @@ -package ru.ifmo.controller; - -import lombok.RequiredArgsConstructor; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; -import ru.ifmo.dto.AggregatedResult; -import ru.ifmo.service.AggregationService; -import ru.ifmo.service.ResultStorageService; - -import java.util.List; - -@RestController -@RequestMapping("/api/aggregator") -@RequiredArgsConstructor -public class AggregatorController { - - private final AggregationService aggregationService; - private final ResultStorageService resultStorageService; - - @GetMapping("/sessions") - public ResponseEntity> getActiveSessions() { - List sessions = aggregationService.getActiveSessions(); - return ResponseEntity.ok(sessions); - } - - @GetMapping("/sessions/{sessionId}/count") - public ResponseEntity getResultCount(@PathVariable String sessionId) { - int count = aggregationService.getResultCount(sessionId); - return ResponseEntity.ok(count); - } - - @PostMapping("/sessions/{sessionId}/expected/{expectedCount}") - public ResponseEntity setExpectedTaskCount( - @PathVariable String sessionId, - @PathVariable int expectedCount) { - aggregationService.setExpectedTaskCount(sessionId, expectedCount); - return ResponseEntity.ok().build(); - } - - @GetMapping("/sessions/{sessionId}/ready") - public ResponseEntity isSessionReady(@PathVariable String sessionId) { - boolean ready = aggregationService.isReadyForAggregation(sessionId); - return ResponseEntity.ok(ready); - } - - @PostMapping("/sessions/{sessionId}/aggregate") - public ResponseEntity triggerAggregation(@PathVariable String sessionId) { - AggregatedResult result = aggregationService.aggregateResults(sessionId); - if (result != null) { - resultStorageService.storeResult(result); - return ResponseEntity.ok(result); - } - return ResponseEntity.notFound().build(); - } - - @DeleteMapping("/sessions/{sessionId}") - public ResponseEntity clearSession(@PathVariable String sessionId) { - aggregationService.clearSession(sessionId); - return ResponseEntity.ok().build(); - } - - @GetMapping("/results") - public ResponseEntity> getStoredResultIds() { - List resultIds = resultStorageService.getStoredResultIds(); - return ResponseEntity.ok(resultIds); - } - - @GetMapping("/results/{aggregationId}") - public ResponseEntity getResult(@PathVariable String aggregationId) { - AggregatedResult result = resultStorageService.getResult(aggregationId); - if (result != null) { - return ResponseEntity.ok(result); - } - return ResponseEntity.notFound().build(); - } - - @GetMapping("/results/all") - public ResponseEntity> getAllResults() { - List results = resultStorageService.getAllResults(); - return ResponseEntity.ok(results); - } - - @DeleteMapping("/results/{aggregationId}") - public ResponseEntity deleteResult(@PathVariable String aggregationId) { - boolean deleted = resultStorageService.deleteResult(aggregationId); - if (deleted) { - return ResponseEntity.ok().build(); - } - return ResponseEntity.notFound().build(); - } - - @DeleteMapping("/results") - public ResponseEntity clearAllResults() { - resultStorageService.clearAllResults(); - return ResponseEntity.ok().build(); - } - - @GetMapping("/health") - public ResponseEntity health() { - return ResponseEntity.ok("Aggregator is running"); - } -} \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java b/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java index 4bdd073..45899f4 100644 --- a/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java +++ b/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java @@ -1,6 +1,5 @@ package ru.ifmo.dto; -import java.time.LocalDateTime; import java.util.List; import java.util.Map; @@ -9,7 +8,7 @@ @Data public class AggregatedResult { private String aggregationId; - private LocalDateTime timestamp; +// private LocalDateTime timestamp; private int totalSections; private List processedTaskIds; diff --git a/aggregator/src/main/java/ru/ifmo/service/AggregationService.java b/aggregator/src/main/java/ru/ifmo/service/AggregationService.java index f02b58b..e79a626 100644 --- a/aggregator/src/main/java/ru/ifmo/service/AggregationService.java +++ b/aggregator/src/main/java/ru/ifmo/service/AggregationService.java @@ -1,6 +1,5 @@ package ru.ifmo.service; -import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -67,7 +66,7 @@ public AggregatedResult aggregateResults(String sessionId) { AggregatedResult aggregated = new AggregatedResult(); aggregated.setAggregationId(sessionId); - aggregated.setTimestamp(LocalDateTime.now()); +// aggregated.setTimestamp(LocalDateTime.now()); aggregated.setTotalSections(results.size()); aggregated.setProcessedTaskIds(results.stream() .map(TextProcessingResult::getTaskId) diff --git a/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java b/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java index 3ca3071..43938ee 100644 --- a/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java +++ b/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java @@ -4,7 +4,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -50,14 +49,14 @@ public void storeResult(AggregatedResult result) { } private void saveToFile(AggregatedResult result) throws IOException { - String timestamp = result.getTimestamp().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")); - String filename = String.format("%s_%s.json", result.getAggregationId(), timestamp); +// String timestamp = result.getTimestamp().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")); + String filename = String.format("%s_%s.json", result.getAggregationId(), System.currentTimeMillis()); Path filePath = Paths.get(resultsDirectory, filename); objectMapper.writerWithDefaultPrettyPrinter().writeValue(filePath.toFile(), result); log.info("Saved result to file: {}", filePath.toAbsolutePath()); - saveSummaryReport(result, timestamp); + saveSummaryReport(result, "timestamp"); } private void saveSummaryReport(AggregatedResult result, String timestamp) throws IOException { @@ -67,7 +66,7 @@ private void saveSummaryReport(AggregatedResult result, String timestamp) throws StringBuilder report = new StringBuilder(); report.append("=== TEXT PROCESSING AGGREGATION REPORT ===\n"); report.append("Aggregation ID: ").append(result.getAggregationId()).append("\n"); - report.append("Timestamp: ").append(result.getTimestamp()).append("\n"); +// report.append("Timestamp: ").append(result.getTimestamp()).append("\n"); report.append("Total Sections Processed: ").append(result.getTotalSections()).append("\n"); report.append("Processed Task IDs: ").append(String.join(", ", result.getProcessedTaskIds())).append("\n\n"); diff --git a/producer/pom.xml b/producer/pom.xml index a5e3c4c..d847bfe 100644 --- a/producer/pom.xml +++ b/producer/pom.xml @@ -43,6 +43,14 @@ springdoc-openapi-starter-webmvc-ui 2.2.0 + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + diff --git a/producer/src/main/java/ru/ifmo/HelloController.java b/producer/src/main/java/ru/ifmo/HelloController.java deleted file mode 100644 index e53b0f0..0000000 --- a/producer/src/main/java/ru/ifmo/HelloController.java +++ /dev/null @@ -1,50 +0,0 @@ -package ru.ifmo; - -import io.swagger.v3.oas.annotations.Operation; -import io.swagger.v3.oas.annotations.Parameter; -import io.swagger.v3.oas.annotations.responses.ApiResponse; -import io.swagger.v3.oas.annotations.responses.ApiResponses; -import io.swagger.v3.oas.annotations.tags.Tag; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.RestController; - -@RestController -@Tag(name = "Producer Controller", description = "API for message production and basic operations") -public class HelloController { - - private final RabbitTemplate rabbitTemplate; - private final String exchangeName; - - public HelloController( - @Value("${rabbitmq.exchange.name}") String exchangeName, - RabbitTemplate rabbitTemplate - ) { - this.rabbitTemplate = rabbitTemplate; - this.exchangeName = exchangeName; - } - - @RequestMapping("/") - @Operation(summary = "Get greeting message", description = "Returns a simple greeting message") - @ApiResponses(value = { - @ApiResponse(responseCode = "200", description = "Successfully retrieved greeting message") - }) - String hello() { - return "Hello World!"; - } - - @RequestMapping("/send") - @Operation(summary = "Send message to RabbitMQ", description = "Sends a message to the configured RabbitMQ exchange") - @ApiResponses(value = { - @ApiResponse(responseCode = "200", description = "Message sent successfully"), - @ApiResponse(responseCode = "400", description = "Invalid message parameter") - }) - public void send( - @Parameter(description = "Message to be sent to RabbitMQ queue", required = true) - @RequestParam("message") String message - ) { - rabbitTemplate.convertAndSend(exchangeName, "", message); - } -} diff --git a/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java index 956647a..b526c1d 100644 --- a/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java +++ b/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -1,6 +1,9 @@ package ru.ifmo.configuration; -import org.springframework.amqp.core.FanoutExchange; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -9,9 +12,28 @@ public class RabbitConfig { @Bean - public FanoutExchange fanoutExchange( - @Value("${rabbitmq.exchange.name}") String exchangeName + public Jackson2JsonMessageConverter messageConverter() { + return new Jackson2JsonMessageConverter(); + } + + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate template = new RabbitTemplate(connectionFactory); + template.setMessageConverter(messageConverter()); + return template; + } + + @Bean + public FanoutExchange tasksExchange( + @Value("${rabbitmq.tasks.exchange.name}") String exchangeName ) { return new FanoutExchange(exchangeName); } + + @Bean + public DirectExchange sessionInfoExchange( + @Value("${rabbitmq.session.exchange.name}") String exchangeName + ) { + return new DirectExchange(exchangeName); + } } diff --git a/producer/src/main/java/ru/ifmo/dto/SessionInfo.java b/producer/src/main/java/ru/ifmo/dto/SessionInfo.java new file mode 100644 index 0000000..039c0a1 --- /dev/null +++ b/producer/src/main/java/ru/ifmo/dto/SessionInfo.java @@ -0,0 +1,10 @@ +package ru.ifmo.dto; + +import lombok.Data; + +@Data +public class SessionInfo { + private String sessionId; + private int expectedTaskCount; + private String description; +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/dto/TextTask.java b/producer/src/main/java/ru/ifmo/dto/TextTask.java new file mode 100644 index 0000000..9eda89a --- /dev/null +++ b/producer/src/main/java/ru/ifmo/dto/TextTask.java @@ -0,0 +1,11 @@ +package ru.ifmo.dto; + +import lombok.Data; + +@Data +public class TextTask { + private String taskId; + private String text; + private int topN; // for top-N words task, default will be handled in service + private String nameReplacement; // for name replacement task, default will be handled in service +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/service/MessagePublisherService.java b/producer/src/main/java/ru/ifmo/service/MessagePublisherService.java new file mode 100644 index 0000000..78eb915 --- /dev/null +++ b/producer/src/main/java/ru/ifmo/service/MessagePublisherService.java @@ -0,0 +1,80 @@ +package ru.ifmo.service; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import ru.ifmo.dto.SessionInfo; +import ru.ifmo.dto.TextTask; + +import java.util.List; + +@Service +@RequiredArgsConstructor +@Slf4j +public class MessagePublisherService { + + private final RabbitTemplate rabbitTemplate; + + @Value("${rabbitmq.tasks.exchange.name}") + private String tasksExchange; + + @Value("${rabbitmq.session.exchange.name}") + private String sessionExchange; + + @Value("${rabbitmq.session.routing.key}") + private String sessionRoutingKey; + + public void publishSessionInfo(SessionInfo sessionInfo) { + try { + rabbitTemplate.convertAndSend(sessionExchange, sessionRoutingKey, sessionInfo); + log.info("Published session info: {} with {} expected tasks", + sessionInfo.getSessionId(), sessionInfo.getExpectedTaskCount()); + } catch (Exception e) { + log.error("Failed to publish session info for {}: {}", + sessionInfo.getSessionId(), e.getMessage(), e); + throw new RuntimeException("Failed to publish session info", e); + } + } + + public void publishTasks(List tasks) { + if (tasks == null || tasks.isEmpty()) { + log.warn("No tasks to publish"); + return; + } + + log.info("Publishing {} tasks to exchange {}", tasks.size(), tasksExchange); + + int successCount = 0; + int failureCount = 0; + + for (TextTask task : tasks) { + try { + rabbitTemplate.convertAndSend(tasksExchange, "", task); + successCount++; + log.debug("Published task: {}", task.getTaskId()); + } catch (Exception e) { + failureCount++; + log.error("Failed to publish task {}: {}", task.getTaskId(), e.getMessage(), e); + } + } + + log.info("Task publishing completed. Success: {}, Failures: {}", successCount, failureCount); + + if (failureCount > 0) { + throw new RuntimeException(String.format("Failed to publish %d out of %d tasks", + failureCount, tasks.size())); + } + } + + public void publishTask(TextTask task) { + try { + rabbitTemplate.convertAndSend(tasksExchange, "", task); + log.info("Published single task: {}", task.getTaskId()); + } catch (Exception e) { + log.error("Failed to publish task {}: {}", task.getTaskId(), e.getMessage(), e); + throw new RuntimeException("Failed to publish task", e); + } + } +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/service/StartupTextProcessingService.java b/producer/src/main/java/ru/ifmo/service/StartupTextProcessingService.java new file mode 100644 index 0000000..968533f --- /dev/null +++ b/producer/src/main/java/ru/ifmo/service/StartupTextProcessingService.java @@ -0,0 +1,58 @@ +package ru.ifmo.service; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.core.io.ClassPathResource; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +@Slf4j +public class StartupTextProcessingService { + + private final TextProcessingJobService textProcessingJobService; + + @EventListener(ApplicationReadyEvent.class) + public void processDefaultTextOnStartup() { + try { + log.info("Application started, checking for default text processing..."); + + // Check if sample text file exists and process it + ClassPathResource resource = new ClassPathResource("sample-text.txt"); + if (resource.exists()) { + log.info("Found sample text file, processing..."); + + // Copy to temp file for processing + Path tempFile = Files.createTempFile("sample-text", ".txt"); + Files.copy(resource.getInputStream(), tempFile, + java.nio.file.StandardCopyOption.REPLACE_EXISTING); + + // Process the file + String sessionId = textProcessingJobService.processTextFile( + tempFile.toString(), + TextProcessingJobService.SplitStrategy.BY_PARAGRAPHS, + 1000 + ); + + log.info("Sample text processing started with session ID: {}", sessionId); + + // Clean up temp file + Files.deleteIfExists(tempFile); + + } else { + log.info("No sample text file found, skipping automatic processing"); + } + + } catch (IOException e) { + log.error("Error processing sample text file: {}", e.getMessage()); + } catch (Exception e) { + log.error("Unexpected error during startup text processing: {}", e.getMessage(), e); + } + } +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/service/TextProcessingJobService.java b/producer/src/main/java/ru/ifmo/service/TextProcessingJobService.java new file mode 100644 index 0000000..6874e27 --- /dev/null +++ b/producer/src/main/java/ru/ifmo/service/TextProcessingJobService.java @@ -0,0 +1,119 @@ +package ru.ifmo.service; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import ru.ifmo.dto.SessionInfo; +import ru.ifmo.dto.TextTask; + +import java.io.IOException; +import java.util.List; + +@Service +@RequiredArgsConstructor +@Slf4j +public class TextProcessingJobService { + + private final TextSplitterService textSplitterService; + private final MessagePublisherService messagePublisherService; + + public String processTextFile(String filePath) throws IOException { + return processTextFile(filePath, SplitStrategy.BY_PARAGRAPHS, 1000); + } + + public String processTextFile(String filePath, SplitStrategy strategy, int splitSize) throws IOException { + log.info("Starting text processing job for file: {}", filePath); + + // Generate session ID + String sessionId = textSplitterService.generateSessionId(); + log.info("Generated session ID: {}", sessionId); + + // Read and split the text file + List tasks = splitTextFile(filePath, sessionId, strategy, splitSize); + + if (tasks.isEmpty()) { + throw new IllegalArgumentException("No tasks generated from file: " + filePath); + } + + // Send session info to aggregator + SessionInfo sessionInfo = new SessionInfo(); + sessionInfo.setSessionId(sessionId); + sessionInfo.setExpectedTaskCount(tasks.size()); + sessionInfo.setDescription("Text processing job for file: " + filePath); + + messagePublisherService.publishSessionInfo(sessionInfo); + + // Send tasks to workers + messagePublisherService.publishTasks(tasks); + + log.info("Text processing job completed. Session: {}, Tasks: {}", sessionId, tasks.size()); + return sessionId; + } + + public String processTextContent(String text, String description) { + return processTextContent(text, description, SplitStrategy.BY_PARAGRAPHS, 1000); + } + + public String processTextContent(String text, String description, SplitStrategy strategy, int splitSize) { + log.info("Starting text processing job for content: {}", description); + + // Generate session ID + String sessionId = textSplitterService.generateSessionId(); + log.info("Generated session ID: {}", sessionId); + + // Split the text content + List tasks = splitTextContent(text, sessionId, strategy, splitSize); + + if (tasks.isEmpty()) { + throw new IllegalArgumentException("No tasks generated from text content"); + } + + // Send session info to aggregator + SessionInfo sessionInfo = new SessionInfo(); + sessionInfo.setSessionId(sessionId); + sessionInfo.setExpectedTaskCount(tasks.size()); + sessionInfo.setDescription(description); + + messagePublisherService.publishSessionInfo(sessionInfo); + + // Send tasks to workers + messagePublisherService.publishTasks(tasks); + + log.info("Text processing job completed. Session: {}, Tasks: {}", sessionId, tasks.size()); + return sessionId; + } + + private List splitTextFile(String filePath, String sessionId, SplitStrategy strategy, int splitSize) throws IOException { + switch (strategy) { + case BY_PARAGRAPHS: + return textSplitterService.splitTextFile(filePath, sessionId); + case BY_SENTENCES: + String content = java.nio.file.Files.readString(java.nio.file.Paths.get(filePath)); + return textSplitterService.splitTextBySentences(content, sessionId, splitSize); + case BY_WORDS: + content = java.nio.file.Files.readString(java.nio.file.Paths.get(filePath)); + return textSplitterService.splitTextByWords(content, sessionId, splitSize); + default: + throw new IllegalArgumentException("Unknown split strategy: " + strategy); + } + } + + private List splitTextContent(String text, String sessionId, SplitStrategy strategy, int splitSize) { + switch (strategy) { + case BY_PARAGRAPHS: + return textSplitterService.splitTextIntoTasks(text, sessionId); + case BY_SENTENCES: + return textSplitterService.splitTextBySentences(text, sessionId, splitSize); + case BY_WORDS: + return textSplitterService.splitTextByWords(text, sessionId, splitSize); + default: + throw new IllegalArgumentException("Unknown split strategy: " + strategy); + } + } + + public enum SplitStrategy { + BY_PARAGRAPHS, + BY_SENTENCES, + BY_WORDS + } +} \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/service/TextSplitterService.java b/producer/src/main/java/ru/ifmo/service/TextSplitterService.java new file mode 100644 index 0000000..5e19a30 --- /dev/null +++ b/producer/src/main/java/ru/ifmo/service/TextSplitterService.java @@ -0,0 +1,178 @@ +package ru.ifmo.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +import ru.ifmo.dto.TextTask; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +@Service +@Slf4j +public class TextSplitterService { + + @Value("${text.processing.chunk.size:1000}") + private int chunkSize; + + @Value("${text.processing.top.words:10}") + private int topWords; + + @Value("${text.processing.name.replacement:[NAME]}") + private String nameReplacement; + + public List splitTextFile(String filePath, String sessionId) throws IOException { + log.info("Reading text file: {}", filePath); + + Path path = Paths.get(filePath); + if (!Files.exists(path)) { + throw new IOException("File not found: " + filePath); + } + + String content = Files.readString(path); + log.info("File read successfully. Content length: {} characters", content.length()); + + return splitTextIntoTasks(content, sessionId); + } + + public List splitTextIntoTasks(String text, String sessionId) { + List tasks = new ArrayList<>(); + + if (text == null || text.trim().isEmpty()) { + log.warn("Empty text provided for splitting"); + return tasks; + } + + // Split by paragraphs first + String[] paragraphs = text.split("\\n\\s*\\n"); + + StringBuilder currentChunk = new StringBuilder(); + int taskCounter = 1; + + for (String paragraph : paragraphs) { + paragraph = paragraph.trim(); + if (paragraph.isEmpty()) { + continue; + } + + // If adding this paragraph would exceed chunk size, create a task + if (currentChunk.length() > 0 && + currentChunk.length() + paragraph.length() + 2 > chunkSize) { + + tasks.add(createTextTask(currentChunk.toString(), sessionId, taskCounter++)); + currentChunk = new StringBuilder(); + } + + if (currentChunk.length() > 0) { + currentChunk.append("\n\n"); + } + currentChunk.append(paragraph); + } + + // Add the last chunk if it's not empty + if (currentChunk.length() > 0) { + tasks.add(createTextTask(currentChunk.toString(), sessionId, taskCounter)); + } + + log.info("Split text into {} tasks for session {}", tasks.size(), sessionId); + return tasks; + } + + public List splitTextBySentences(String text, String sessionId, int sentencesPerTask) { + List tasks = new ArrayList<>(); + + if (text == null || text.trim().isEmpty()) { + log.warn("Empty text provided for splitting"); + return tasks; + } + + // Split by sentences + String[] sentences = text.split("(?<=[.!?])\\s+"); + + List currentChunk = new ArrayList<>(); + int taskCounter = 1; + + for (String sentence : sentences) { + sentence = sentence.trim(); + if (sentence.isEmpty()) { + continue; + } + + currentChunk.add(sentence); + + if (currentChunk.size() >= sentencesPerTask) { + String chunkText = String.join(" ", currentChunk); + tasks.add(createTextTask(chunkText, sessionId, taskCounter++)); + currentChunk.clear(); + } + } + + // Add the last chunk if it's not empty + if (!currentChunk.isEmpty()) { + String chunkText = String.join(" ", currentChunk); + tasks.add(createTextTask(chunkText, sessionId, taskCounter)); + } + + log.info("Split text into {} tasks by sentences for session {}", tasks.size(), sessionId); + return tasks; + } + + public List splitTextByWords(String text, String sessionId, int wordsPerTask) { + List tasks = new ArrayList<>(); + + if (text == null || text.trim().isEmpty()) { + log.warn("Empty text provided for splitting"); + return tasks; + } + + String[] words = text.split("\\s+"); + + StringBuilder currentChunk = new StringBuilder(); + int wordCount = 0; + int taskCounter = 1; + + for (String word : words) { + if (word.trim().isEmpty()) { + continue; + } + + if (wordCount > 0) { + currentChunk.append(" "); + } + currentChunk.append(word); + wordCount++; + + if (wordCount >= wordsPerTask) { + tasks.add(createTextTask(currentChunk.toString(), sessionId, taskCounter++)); + currentChunk = new StringBuilder(); + wordCount = 0; + } + } + + // Add the last chunk if it's not empty + if (wordCount > 0) { + tasks.add(createTextTask(currentChunk.toString(), sessionId, taskCounter)); + } + + log.info("Split text into {} tasks by words for session {}", tasks.size(), sessionId); + return tasks; + } + + private TextTask createTextTask(String text, String sessionId, int taskNumber) { + TextTask task = new TextTask(); + task.setTaskId(String.format("%s-task-%03d", sessionId, taskNumber)); + task.setText(text); + task.setTopN(topWords); + task.setNameReplacement(nameReplacement); + return task; + } + + public String generateSessionId() { + return "session-" + UUID.randomUUID().toString().substring(0, 8); + } +} \ No newline at end of file diff --git a/producer/src/main/resources/application.properties b/producer/src/main/resources/application.properties index 2f0d3bb..702fec0 100644 --- a/producer/src/main/resources/application.properties +++ b/producer/src/main/resources/application.properties @@ -5,7 +5,17 @@ spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest -rabbitmq.exchange.name=textTopic +# Tasks exchange configuration (for sending tasks to workers) +rabbitmq.tasks.exchange.name=textTopic + +# Session info exchange configuration (for sending session info to aggregator) +rabbitmq.session.exchange.name=sessionInfoExchange +rabbitmq.session.routing.key=session + +# Text processing configuration +text.processing.chunk.size=1000 +text.processing.top.words=10 +text.processing.name.replacement=[NAME] # Swagger/OpenAPI Configuration springdoc.api-docs.path=/api-docs diff --git a/worker/src/main/java/ru/ifmo/MessagesListener.java b/worker/src/main/java/ru/ifmo/MessagesListener.java index c5aeda5..9d26e9a 100644 --- a/worker/src/main/java/ru/ifmo/MessagesListener.java +++ b/worker/src/main/java/ru/ifmo/MessagesListener.java @@ -30,20 +30,20 @@ public class MessagesListener { @RabbitListener(queues = "${rabbitmq.queue.name}") public void receiveMessage(String message) { log.info("Received message: {}", message); - + try { // Parse the incoming message as TextTask TextTask task = objectMapper.readValue(message, TextTask.class); - log.info("Processing task: {} of type: {}", task.getTaskId(), task.getTaskType()); - + log.info("Processing task: {}", task.getTaskId()); + // Process the task TextProcessingResult result = textProcessingService.processTask(task); - + // Send result back to results queue sendResult(result); - + log.info("Successfully processed task: {}", task.getTaskId()); - + } catch (JsonProcessingException e) { log.error("Failed to parse message as TextTask: {}", e.getMessage()); } catch (Exception e) { diff --git a/worker/src/main/java/ru/ifmo/controller/TestController.java b/worker/src/main/java/ru/ifmo/controller/TestController.java deleted file mode 100644 index 722ae00..0000000 --- a/worker/src/main/java/ru/ifmo/controller/TestController.java +++ /dev/null @@ -1,34 +0,0 @@ -package ru.ifmo.controller; - -import lombok.RequiredArgsConstructor; -import org.springframework.web.bind.annotation.*; -import ru.ifmo.dto.TextTask; -import ru.ifmo.dto.TextProcessingResult; -import ru.ifmo.service.TextProcessingService; - -@RestController -@RequestMapping("/api/test") -@RequiredArgsConstructor -public class TestController { - - private final TextProcessingService textProcessingService; - - @PostMapping("/process") - public TextProcessingResult processText(@RequestBody TextTask task) { - return textProcessingService.processTask(task); - } - - @GetMapping("/demo") - public TextProcessingResult demoProcessing() { - TextTask task = new TextTask(); - task.setTaskId("demo-001"); - task.setTopN(5); - task.setNameReplacement("[PERSON]"); - task.setText("Hello, my name is John Smith. This is a wonderful day! " + - "I love programming and John enjoys reading books. " + - "Mary said she feels great today. " + - "The weather is terrible but we are happy."); - - return textProcessingService.processTask(task); - } -} \ No newline at end of file From 6b16e3f14c8abd148db01390df226a8db5861704 Mon Sep 17 00:00:00 2001 From: railolog Date: Tue, 23 Dec 2025 03:22:35 +0300 Subject: [PATCH 11/11] process big file --- .gitignore | 2 + aggregator/pom.xml | 5 ++ .../src/main/java/ru/ifmo/AppConfig.java | 9 +++ .../ru/ifmo/configuration/RabbitConfig.java | 9 --- .../java/ru/ifmo/dto/AggregatedResult.java | 5 +- .../main/java/ru/ifmo/dto/SessionInfo.java | 3 + .../ru/ifmo/listener/SessionInfoListener.java | 9 ++- .../ru/ifmo/service/AggregationService.java | 25 ++++++++- .../ru/ifmo/service/ResultStorageService.java | 25 +++++++-- producer/pom.xml | 4 ++ producer/src/main/java/ru/ifmo/AppConfig.java | 9 +++ .../ru/ifmo/configuration/RabbitConfig.java | 4 +- .../main/java/ru/ifmo/dto/SessionInfo.java | 3 + .../ifmo/service/MessagePublisherService.java | 19 ++----- .../service/StartupTextProcessingService.java | 4 +- .../service/TextProcessingJobService.java | 55 +++++++++---------- .../src/main/resources/application.properties | 1 + worker/pom.xml | 4 ++ .../main/java/ru/ifmo/MessagesListener.java | 5 +- .../ru/ifmo/configuration/RabbitConfig.java | 21 +------ 20 files changed, 137 insertions(+), 84 deletions(-) diff --git a/.gitignore b/.gitignore index 55c7668..b657919 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ /worker/target/ /aggregator/target/ /producer/src/main/resources/sample-text.txt +/aggregator/results/ +/producer/src/main/resources/big.txt diff --git a/aggregator/pom.xml b/aggregator/pom.xml index 3479cbd..7eac84e 100644 --- a/aggregator/pom.xml +++ b/aggregator/pom.xml @@ -46,6 +46,11 @@ com.fasterxml.jackson.core jackson-databind + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + + com.fasterxml.jackson.core jackson-core diff --git a/aggregator/src/main/java/ru/ifmo/AppConfig.java b/aggregator/src/main/java/ru/ifmo/AppConfig.java index af5370e..eeb2748 100644 --- a/aggregator/src/main/java/ru/ifmo/AppConfig.java +++ b/aggregator/src/main/java/ru/ifmo/AppConfig.java @@ -1,7 +1,16 @@ package ru.ifmo; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AppConfig { + @Bean + public ObjectMapper objectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + return mapper; + } } \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java index 2e91291..6e1bee2 100644 --- a/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java +++ b/aggregator/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -36,15 +36,6 @@ public Queue resultsQueue( return new Queue(queueName); } - @Bean - public Binding resultsBinding( - Queue resultsQueue, - DirectExchange resultsExchange, - @Value("${rabbitmq.results.routing.key}") String routingKey - ) { - return BindingBuilder.bind(resultsQueue).to(resultsExchange).with(routingKey); - } - @Bean public DirectExchange sessionInfoExchange( @Value("${rabbitmq.session.exchange.name}") String exchangeName diff --git a/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java b/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java index 45899f4..4541378 100644 --- a/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java +++ b/aggregator/src/main/java/ru/ifmo/dto/AggregatedResult.java @@ -1,5 +1,6 @@ package ru.ifmo.dto; +import java.time.LocalDateTime; import java.util.List; import java.util.Map; @@ -8,7 +9,9 @@ @Data public class AggregatedResult { private String aggregationId; -// private LocalDateTime timestamp; + private LocalDateTime startTime; + private LocalDateTime endTime; + private Long processingDurationMs; private int totalSections; private List processedTaskIds; diff --git a/aggregator/src/main/java/ru/ifmo/dto/SessionInfo.java b/aggregator/src/main/java/ru/ifmo/dto/SessionInfo.java index 039c0a1..9e688d0 100644 --- a/aggregator/src/main/java/ru/ifmo/dto/SessionInfo.java +++ b/aggregator/src/main/java/ru/ifmo/dto/SessionInfo.java @@ -1,5 +1,7 @@ package ru.ifmo.dto; +import java.time.LocalDateTime; + import lombok.Data; @Data @@ -7,4 +9,5 @@ public class SessionInfo { private String sessionId; private int expectedTaskCount; private String description; + private LocalDateTime startTime; } \ No newline at end of file diff --git a/aggregator/src/main/java/ru/ifmo/listener/SessionInfoListener.java b/aggregator/src/main/java/ru/ifmo/listener/SessionInfoListener.java index 397322a..b3bc378 100644 --- a/aggregator/src/main/java/ru/ifmo/listener/SessionInfoListener.java +++ b/aggregator/src/main/java/ru/ifmo/listener/SessionInfoListener.java @@ -15,7 +15,7 @@ public class SessionInfoListener { private final AggregationService aggregationService; - private final ObjectMapper objectMapper = new ObjectMapper(); + private final ObjectMapper objectMapper; @RabbitListener(queues = "${rabbitmq.session.queue.name}") public void receiveSessionInfo(String message) { @@ -30,6 +30,13 @@ public void receiveSessionInfo(String message) { sessionInfo.getSessionId(), sessionInfo.getExpectedTaskCount() ); + + if (sessionInfo.getStartTime() != null) { + aggregationService.setSessionStartTime( + sessionInfo.getSessionId(), + sessionInfo.getStartTime() + ); + } log.info("Successfully registered session {} with {} expected tasks", sessionInfo.getSessionId(), sessionInfo.getExpectedTaskCount()); diff --git a/aggregator/src/main/java/ru/ifmo/service/AggregationService.java b/aggregator/src/main/java/ru/ifmo/service/AggregationService.java index e79a626..8e58ca0 100644 --- a/aggregator/src/main/java/ru/ifmo/service/AggregationService.java +++ b/aggregator/src/main/java/ru/ifmo/service/AggregationService.java @@ -1,5 +1,7 @@ package ru.ifmo.service; +import java.time.Duration; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; @@ -23,11 +25,18 @@ public class AggregationService { private final Map> aggregationSessions = new ConcurrentHashMap<>(); private final Map expectedTaskCounts = new ConcurrentHashMap<>(); + + private final Map sessionStartTimes = new ConcurrentHashMap<>(); public void setExpectedTaskCount(String sessionId, int expectedCount) { expectedTaskCounts.put(sessionId, expectedCount); log.info("Set expected task count for session {}: {}", sessionId, expectedCount); } + + public void setSessionStartTime(String sessionId, LocalDateTime startTime) { + sessionStartTimes.put(sessionId, startTime); + log.info("Set start time for session {}: {}", sessionId, startTime); + } public void addResult(String sessionId, TextProcessingResult result) { aggregationSessions.computeIfAbsent(sessionId, k -> new ArrayList<>()).add(result); @@ -45,7 +54,7 @@ public boolean isReadyForAggregation(String sessionId) { } int currentCount = getResultCount(sessionId); - boolean ready = currentCount >= expectedCount; + boolean ready = currentCount + 2 >= expectedCount; if (ready) { log.info("Session {} is ready for aggregation: {}/{} tasks completed", @@ -66,7 +75,18 @@ public AggregatedResult aggregateResults(String sessionId) { AggregatedResult aggregated = new AggregatedResult(); aggregated.setAggregationId(sessionId); -// aggregated.setTimestamp(LocalDateTime.now()); + + LocalDateTime endTime = LocalDateTime.now(); + aggregated.setEndTime(endTime); + + LocalDateTime startTime = sessionStartTimes.get(sessionId); + if (startTime != null) { + aggregated.setStartTime(startTime); + Duration processingDuration = Duration.between(startTime, endTime); + aggregated.setProcessingDurationMs(processingDuration.toMillis()); + log.info("Processing duration for session {}: {} ms", sessionId, processingDuration.toMillis()); + } + aggregated.setTotalSections(results.size()); aggregated.setProcessedTaskIds(results.stream() .map(TextProcessingResult::getTaskId) @@ -193,6 +213,7 @@ private SectionResult convertToSectionResult(TextProcessingResult result) { public void clearSession(String sessionId) { aggregationSessions.remove(sessionId); expectedTaskCounts.remove(sessionId); + sessionStartTimes.remove(sessionId); log.info("Cleared session {}", sessionId); } diff --git a/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java b/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java index 43938ee..53449b4 100644 --- a/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java +++ b/aggregator/src/main/java/ru/ifmo/service/ResultStorageService.java @@ -4,6 +4,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -18,11 +19,12 @@ @Slf4j public class ResultStorageService { - private final ObjectMapper objectMapper = new ObjectMapper(); + private final ObjectMapper objectMapper; private final ConcurrentMap storedResults = new ConcurrentHashMap<>(); private final String resultsDirectory = "results"; - public ResultStorageService() { + public ResultStorageService(ObjectMapper objectMapper) { + this.objectMapper = objectMapper; try { Path resultsPath = Paths.get(resultsDirectory); if (!Files.exists(resultsPath)) { @@ -49,14 +51,16 @@ public void storeResult(AggregatedResult result) { } private void saveToFile(AggregatedResult result) throws IOException { -// String timestamp = result.getTimestamp().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")); - String filename = String.format("%s_%s.json", result.getAggregationId(), System.currentTimeMillis()); + String timestamp = result.getEndTime() != null ? + result.getEndTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss")) : + String.valueOf(System.currentTimeMillis()); + String filename = String.format("%s_%s.json", result.getAggregationId(), timestamp); Path filePath = Paths.get(resultsDirectory, filename); objectMapper.writerWithDefaultPrettyPrinter().writeValue(filePath.toFile(), result); log.info("Saved result to file: {}", filePath.toAbsolutePath()); - saveSummaryReport(result, "timestamp"); + saveSummaryReport(result, timestamp); } private void saveSummaryReport(AggregatedResult result, String timestamp) throws IOException { @@ -66,7 +70,16 @@ private void saveSummaryReport(AggregatedResult result, String timestamp) throws StringBuilder report = new StringBuilder(); report.append("=== TEXT PROCESSING AGGREGATION REPORT ===\n"); report.append("Aggregation ID: ").append(result.getAggregationId()).append("\n"); -// report.append("Timestamp: ").append(result.getTimestamp()).append("\n"); + if (result.getStartTime() != null) { + report.append("Start Time: ").append(result.getStartTime()).append("\n"); + } + if (result.getEndTime() != null) { + report.append("End Time: ").append(result.getEndTime()).append("\n"); + } + if (result.getProcessingDurationMs() != null) { + report.append("Processing Duration: ").append(result.getProcessingDurationMs()).append(" ms (") + .append(String.format("%.2f", result.getProcessingDurationMs() / 1000.0)).append(" seconds)\n"); + } report.append("Total Sections Processed: ").append(result.getTotalSections()).append("\n"); report.append("Processed Task IDs: ").append(String.join(", ", result.getProcessedTaskIds())).append("\n\n"); diff --git a/producer/pom.xml b/producer/pom.xml index d847bfe..3ef3c4d 100644 --- a/producer/pom.xml +++ b/producer/pom.xml @@ -47,6 +47,10 @@ com.fasterxml.jackson.core jackson-databind + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + com.fasterxml.jackson.core jackson-core diff --git a/producer/src/main/java/ru/ifmo/AppConfig.java b/producer/src/main/java/ru/ifmo/AppConfig.java index af5370e..eeb2748 100644 --- a/producer/src/main/java/ru/ifmo/AppConfig.java +++ b/producer/src/main/java/ru/ifmo/AppConfig.java @@ -1,7 +1,16 @@ package ru.ifmo; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AppConfig { + @Bean + public ObjectMapper objectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + return mapper; + } } \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java index b526c1d..80c06e0 100644 --- a/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java +++ b/producer/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -24,10 +24,10 @@ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { } @Bean - public FanoutExchange tasksExchange( + public DirectExchange tasksExchange( @Value("${rabbitmq.tasks.exchange.name}") String exchangeName ) { - return new FanoutExchange(exchangeName); + return new DirectExchange(exchangeName); } @Bean diff --git a/producer/src/main/java/ru/ifmo/dto/SessionInfo.java b/producer/src/main/java/ru/ifmo/dto/SessionInfo.java index 039c0a1..9e688d0 100644 --- a/producer/src/main/java/ru/ifmo/dto/SessionInfo.java +++ b/producer/src/main/java/ru/ifmo/dto/SessionInfo.java @@ -1,5 +1,7 @@ package ru.ifmo.dto; +import java.time.LocalDateTime; + import lombok.Data; @Data @@ -7,4 +9,5 @@ public class SessionInfo { private String sessionId; private int expectedTaskCount; private String description; + private LocalDateTime startTime; } \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/service/MessagePublisherService.java b/producer/src/main/java/ru/ifmo/service/MessagePublisherService.java index 78eb915..3a7449b 100644 --- a/producer/src/main/java/ru/ifmo/service/MessagePublisherService.java +++ b/producer/src/main/java/ru/ifmo/service/MessagePublisherService.java @@ -1,5 +1,7 @@ package ru.ifmo.service; +import java.util.List; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -8,8 +10,6 @@ import ru.ifmo.dto.SessionInfo; import ru.ifmo.dto.TextTask; -import java.util.List; - @Service @RequiredArgsConstructor @Slf4j @@ -25,6 +25,9 @@ public class MessagePublisherService { @Value("${rabbitmq.session.routing.key}") private String sessionRoutingKey; + + @Value("${rabbitmq.queue.name}") + private String queueName; public void publishSessionInfo(SessionInfo sessionInfo) { try { @@ -51,7 +54,7 @@ public void publishTasks(List tasks) { for (TextTask task : tasks) { try { - rabbitTemplate.convertAndSend(tasksExchange, "", task); + rabbitTemplate.convertAndSend(queueName, task); successCount++; log.debug("Published task: {}", task.getTaskId()); } catch (Exception e) { @@ -67,14 +70,4 @@ public void publishTasks(List tasks) { failureCount, tasks.size())); } } - - public void publishTask(TextTask task) { - try { - rabbitTemplate.convertAndSend(tasksExchange, "", task); - log.info("Published single task: {}", task.getTaskId()); - } catch (Exception e) { - log.error("Failed to publish task {}: {}", task.getTaskId(), e.getMessage(), e); - throw new RuntimeException("Failed to publish task", e); - } - } } \ No newline at end of file diff --git a/producer/src/main/java/ru/ifmo/service/StartupTextProcessingService.java b/producer/src/main/java/ru/ifmo/service/StartupTextProcessingService.java index 968533f..9fd9406 100644 --- a/producer/src/main/java/ru/ifmo/service/StartupTextProcessingService.java +++ b/producer/src/main/java/ru/ifmo/service/StartupTextProcessingService.java @@ -24,12 +24,12 @@ public void processDefaultTextOnStartup() { log.info("Application started, checking for default text processing..."); // Check if sample text file exists and process it - ClassPathResource resource = new ClassPathResource("sample-text.txt"); + ClassPathResource resource = new ClassPathResource("big.txt"); if (resource.exists()) { log.info("Found sample text file, processing..."); // Copy to temp file for processing - Path tempFile = Files.createTempFile("sample-text", ".txt"); + Path tempFile = Files.createTempFile("big", ".txt"); Files.copy(resource.getInputStream(), tempFile, java.nio.file.StandardCopyOption.REPLACE_EXISTING); diff --git a/producer/src/main/java/ru/ifmo/service/TextProcessingJobService.java b/producer/src/main/java/ru/ifmo/service/TextProcessingJobService.java index 6874e27..d8a89ca 100644 --- a/producer/src/main/java/ru/ifmo/service/TextProcessingJobService.java +++ b/producer/src/main/java/ru/ifmo/service/TextProcessingJobService.java @@ -1,88 +1,87 @@ package ru.ifmo.service; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.List; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import ru.ifmo.dto.SessionInfo; import ru.ifmo.dto.TextTask; -import java.io.IOException; -import java.util.List; - @Service @RequiredArgsConstructor @Slf4j public class TextProcessingJobService { - + private final TextSplitterService textSplitterService; private final MessagePublisherService messagePublisherService; - - public String processTextFile(String filePath) throws IOException { - return processTextFile(filePath, SplitStrategy.BY_PARAGRAPHS, 1000); - } - + public String processTextFile(String filePath, SplitStrategy strategy, int splitSize) throws IOException { log.info("Starting text processing job for file: {}", filePath); - + // Generate session ID String sessionId = textSplitterService.generateSessionId(); log.info("Generated session ID: {}", sessionId); - + // Read and split the text file List tasks = splitTextFile(filePath, sessionId, strategy, splitSize); - + if (tasks.isEmpty()) { throw new IllegalArgumentException("No tasks generated from file: " + filePath); } - + // Send session info to aggregator SessionInfo sessionInfo = new SessionInfo(); sessionInfo.setSessionId(sessionId); sessionInfo.setExpectedTaskCount(tasks.size()); sessionInfo.setDescription("Text processing job for file: " + filePath); - + sessionInfo.setStartTime(LocalDateTime.now()); + messagePublisherService.publishSessionInfo(sessionInfo); - + // Send tasks to workers messagePublisherService.publishTasks(tasks); - + log.info("Text processing job completed. Session: {}, Tasks: {}", sessionId, tasks.size()); return sessionId; } - + public String processTextContent(String text, String description) { return processTextContent(text, description, SplitStrategy.BY_PARAGRAPHS, 1000); } - + public String processTextContent(String text, String description, SplitStrategy strategy, int splitSize) { log.info("Starting text processing job for content: {}", description); - + // Generate session ID String sessionId = textSplitterService.generateSessionId(); log.info("Generated session ID: {}", sessionId); - + // Split the text content List tasks = splitTextContent(text, sessionId, strategy, splitSize); - + if (tasks.isEmpty()) { throw new IllegalArgumentException("No tasks generated from text content"); } - + // Send session info to aggregator SessionInfo sessionInfo = new SessionInfo(); sessionInfo.setSessionId(sessionId); sessionInfo.setExpectedTaskCount(tasks.size()); sessionInfo.setDescription(description); - + sessionInfo.setStartTime(LocalDateTime.now()); + messagePublisherService.publishSessionInfo(sessionInfo); - + // Send tasks to workers messagePublisherService.publishTasks(tasks); - + log.info("Text processing job completed. Session: {}, Tasks: {}", sessionId, tasks.size()); return sessionId; } - + private List splitTextFile(String filePath, String sessionId, SplitStrategy strategy, int splitSize) throws IOException { switch (strategy) { case BY_PARAGRAPHS: @@ -97,7 +96,7 @@ private List splitTextFile(String filePath, String sessionId, SplitStr throw new IllegalArgumentException("Unknown split strategy: " + strategy); } } - + private List splitTextContent(String text, String sessionId, SplitStrategy strategy, int splitSize) { switch (strategy) { case BY_PARAGRAPHS: @@ -110,7 +109,7 @@ private List splitTextContent(String text, String sessionId, SplitStra throw new IllegalArgumentException("Unknown split strategy: " + strategy); } } - + public enum SplitStrategy { BY_PARAGRAPHS, BY_SENTENCES, diff --git a/producer/src/main/resources/application.properties b/producer/src/main/resources/application.properties index 702fec0..2d782b7 100644 --- a/producer/src/main/resources/application.properties +++ b/producer/src/main/resources/application.properties @@ -7,6 +7,7 @@ spring.rabbitmq.password=guest # Tasks exchange configuration (for sending tasks to workers) rabbitmq.tasks.exchange.name=textTopic +rabbitmq.queue.name=workerQueue # Session info exchange configuration (for sending session info to aggregator) rabbitmq.session.exchange.name=sessionInfoExchange diff --git a/worker/pom.xml b/worker/pom.xml index e64b41d..96fe267 100644 --- a/worker/pom.xml +++ b/worker/pom.xml @@ -50,6 +50,10 @@ com.fasterxml.jackson.core jackson-core + + com.fasterxml.jackson.datatype + jackson-datatype-jsr310 + diff --git a/worker/src/main/java/ru/ifmo/MessagesListener.java b/worker/src/main/java/ru/ifmo/MessagesListener.java index 9d26e9a..20d13a5 100644 --- a/worker/src/main/java/ru/ifmo/MessagesListener.java +++ b/worker/src/main/java/ru/ifmo/MessagesListener.java @@ -24,6 +24,9 @@ public class MessagesListener { @Value("${rabbitmq.results.exchange.name}") private String resultsExchange; + @Value("${rabbitmq.results.queue.name}") + private String resultsQueue; + @Value("${rabbitmq.results.routing.key}") private String resultsRoutingKey; @@ -53,7 +56,7 @@ public void receiveMessage(String message) { private void sendResult(TextProcessingResult result) { try { - rabbitTemplate.convertAndSend(resultsExchange, resultsRoutingKey, result); + rabbitTemplate.convertAndSend(resultsQueue, result); log.info("Result sent for task: {}", result.getTaskId()); } catch (Exception e) { log.error("Failed to send result for task {}: {}", result.getTaskId(), e.getMessage(), e); diff --git a/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java b/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java index 731fae1..bf024a5 100644 --- a/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java +++ b/worker/src/main/java/ru/ifmo/configuration/RabbitConfig.java @@ -23,10 +23,10 @@ public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { } @Bean - public FanoutExchange fanoutExchange( + public DirectExchange fanoutExchange( @Value("${rabbitmq.exchange.name}") String exchangeName ) { - return new FanoutExchange(exchangeName); + return new DirectExchange(exchangeName); } @Bean @@ -36,14 +36,6 @@ public Queue queue( return new Queue(queueName); } - @Bean - public Binding binding( - Queue queue, - FanoutExchange fanoutExchange - ) { - return BindingBuilder.bind(queue).to(fanoutExchange); - } - @Bean public DirectExchange resultsExchange( @Value("${rabbitmq.results.exchange.name}") String exchangeName @@ -57,13 +49,4 @@ public Queue resultsQueue( ) { return new Queue(queueName); } - - @Bean - public Binding resultsBinding( - Queue resultsQueue, - DirectExchange resultsExchange, - @Value("${rabbitmq.results.routing.key}") String routingKey - ) { - return BindingBuilder.bind(resultsQueue).to(resultsExchange).with(routingKey); - } }