diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..f5eb3a4
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,6 @@
+tmp
+.idea
+results
+**/build/**
+**/.gradle/**
+**.iml
\ No newline at end of file
diff --git a/.lfsconfig b/.lfsconfig
new file mode 100644
index 0000000..9cbf9f4
--- /dev/null
+++ b/.lfsconfig
@@ -0,0 +1,2 @@
+[lfs]
+ url = https://github.com/AdvancedJavaLabs/lab3-xGodness.git/info/lfs
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..ef91aab
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,8 @@
+build:
+ @cd sales-analyzer && ./gradlew build
+
+up: down
+ @docker compose --env-file hadoop.env up --build
+
+down:
+ @docker compose down
diff --git a/docker-compose.yaml b/docker-compose.yaml
new file mode 100644
index 0000000..c4eea91
--- /dev/null
+++ b/docker-compose.yaml
@@ -0,0 +1,12 @@
+services:
+ hadoop:
+ build:
+ context: .
+ dockerfile: hadoop/Dockerfile
+ environment:
+ MAPRED_NUM_REDUCE_TASKS: $MAPRED_NUM_REDUCE_TASKS
+ SPLIT_MINSIZE: $SPLIT_MINSIZE
+ ports:
+ - "9870:9870"
+ volumes:
+ - ./results:/results
diff --git a/hadoop.env b/hadoop.env
new file mode 100644
index 0000000..17c7728
--- /dev/null
+++ b/hadoop.env
@@ -0,0 +1,2 @@
+MAPRED_NUM_REDUCE_TASKS=5
+SPLIT_MINSIZE=4096
\ No newline at end of file
diff --git a/hadoop/Dockerfile b/hadoop/Dockerfile
new file mode 100644
index 0000000..2643ff7
--- /dev/null
+++ b/hadoop/Dockerfile
@@ -0,0 +1,69 @@
+# Download hadoop
+FROM eclipse-temurin:17-jdk AS base
+
+ENV HADOOP_VERSION=3.4.2
+RUN wget -O /tmp/hadoop.tar.gz "https://dlcdn.apache.org/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION-lean.tar.gz"
+
+
+# Install dependencies
+FROM base AS deps
+
+RUN apt-get update && \
+ apt-get install -y --no-install-recommends ssh pdsh sudo
+
+
+# Install hadoop
+FROM deps AS hadoop-installation
+
+ENV HADOOP_HOME=/opt/hadoop
+ENV HADOOP_CONFIG_DIR=$HADOOP_HOME/etc/hadoop
+ENV PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
+ENV HADOOP_USER=hadoop
+ENV HADOOP_UID=1001
+ENV HADOOP_GID=1001
+
+RUN groupadd -g $HADOOP_GID $HADOOP_USER && \
+ useradd -u $HADOOP_UID -g $HADOOP_GID -r -m -s /bin/bash $HADOOP_USER && \
+ usermod -aG sudo $HADOOP_USER && \
+ echo "$HADOOP_USER ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers
+
+RUN tar -xzf /tmp/hadoop.tar.gz -C /opt && \
+ mv /opt/hadoop-$HADOOP_VERSION /opt/hadoop && \
+ rm /tmp/hadoop.tar.gz
+
+
+# Configure hadoop
+FROM hadoop-installation AS hadoop-configuration
+
+COPY hadoop/core-site.xml $HADOOP_CONFIG_DIR/core-site.xml
+COPY hadoop/hdfs-site.xml $HADOOP_CONFIG_DIR/hdfs-site.xml
+
+
+# Prepare runtime
+FROM hadoop-configuration AS runtime
+
+RUN echo "export JAVA_HOME=$JAVA_HOME" >> $HADOOP_HOME/etc/hadoop/hadoop-env.sh
+RUN echo "export HDFS_NAMENODE_USER=$HADOOP_USER" >> $HADOOP_HOME/etc/hadoop/hadoop-env.sh
+RUN echo "export HDFS_DATANODE_USER=$HADOOP_USER" >> $HADOOP_HOME/etc/hadoop/hadoop-env.sh
+RUN echo "export HDFS_HADOOP_USER=$HADOOP_USER" >> $HADOOP_HOME/etc/hadoop/hadoop-env.sh
+
+ENV ARTIFACT_NAME=sales-analyzer
+ENV JAR_NAME=$ARTIFACT_NAME-1.0.jar
+ENV BUILD_PATH=$ARTIFACT_NAME/build/libs
+COPY $BUILD_PATH/$JAR_NAME .
+
+COPY *.csv .
+
+COPY hadoop/start.sh start.sh
+COPY hadoop/start_benchmark.sh start_benchmark.sh
+RUN chmod +x start.sh
+RUN chmod +x start_benchmark.sh
+
+USER $HADOOP_USER
+
+RUN ssh-keygen -t rsa -P "" -f ~/.ssh/id_rsa && \
+ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys && \
+ chmod 600 ~/.ssh/authorized_keys
+
+ENTRYPOINT ["./start.sh"]
+#ENTRYPOINT ["./start_benchmark.sh"]
diff --git a/hadoop/core-site.xml b/hadoop/core-site.xml
new file mode 100644
index 0000000..e88f92a
--- /dev/null
+++ b/hadoop/core-site.xml
@@ -0,0 +1,6 @@
+
+
+ fs.defaultFS
+ hdfs://localhost:9000
+
+
\ No newline at end of file
diff --git a/hadoop/hdfs-site.xml b/hadoop/hdfs-site.xml
new file mode 100644
index 0000000..32199df
--- /dev/null
+++ b/hadoop/hdfs-site.xml
@@ -0,0 +1,10 @@
+
+
+ dfs.replication
+ 1
+
+
+ dfs.permissions.enabled
+ false
+
+
\ No newline at end of file
diff --git a/hadoop/start.sh b/hadoop/start.sh
new file mode 100644
index 0000000..f11e129
--- /dev/null
+++ b/hadoop/start.sh
@@ -0,0 +1,40 @@
+#!/bin/bash
+set -e
+
+timestamp=$(date +"%Y-%m-%d_%H-%M-%S")
+dir=$timestamp-tasks-$MAPRED_NUM_REDUCE_TASKS-split-$SPLIT_MINSIZE
+
+sudo service ssh start
+sudo chmod o+w /results
+
+NAMENODE_DIR=/opt/hadoop/data/namenode
+if [ ! -d "$NAMENODE_DIR/current" ]; then
+ hdfs namenode -format
+fi
+
+start-dfs.sh
+start-yarn.sh
+
+hadoop fs -mkdir /csv
+hadoop fs -put *.csv /csv
+
+echo "Initialization completed"
+echo "Starting job..."
+
+start_time=$(date +%s%N)
+hadoop jar sales-analyzer-1.0.jar org.itmo.MainJob $MAPRED_NUM_REDUCE_TASKS out /csv/*.csv
+end_time=$(date +%s%N)
+
+duration_ms=$(($((end_time - start_time)) / 1000000))
+
+mkdir results/$dir
+
+echo $duration_ms > results/$dir/time_elapsed_ms
+
+hadoop fs -get out/part-r-00000 results/$dir/mapred.csv
+sed -i '1i category,revenue,quantity' results/$dir/mapred.csv
+
+hadoop fs -rm -r out
+hadoop fs -rm -r mapred_out
+
+#tail -f /dev/null
diff --git a/hadoop/start_benchmark.sh b/hadoop/start_benchmark.sh
new file mode 100644
index 0000000..8201767
--- /dev/null
+++ b/hadoop/start_benchmark.sh
@@ -0,0 +1,49 @@
+#!/bin/bash
+set -e
+
+sudo service ssh start
+sudo chmod o+w /results
+
+NAMENODE_DIR=/opt/hadoop/data/namenode
+if [ ! -d "$NAMENODE_DIR/current" ]; then
+ hdfs namenode -format
+fi
+
+start-dfs.sh
+start-yarn.sh
+
+hadoop fs -mkdir /csv
+hadoop fs -put *.csv /csv
+
+echo "Initialization completed"
+
+MAPRED_NUM_REDUCE_TASKS_SET=(1 3 5 10 15 20)
+SPLIT_MINSIZE_SET=(1024 2048 8192 32768 131072 262144)
+
+for MAPRED_NUM_REDUCE_TASKS in "${MAPRED_NUM_REDUCE_TASKS_SET[@]}"; do
+ for SPLIT_MINSIZE in "${SPLIT_MINSIZE_SET[@]}"; do
+ timestamp=$(date +"%Y-%m-%d_%H-%M-%S")
+ dir=$timestamp-tasks-$MAPRED_NUM_REDUCE_TASKS-split-$SPLIT_MINSIZE
+
+ echo "Running with MAPRED_NUM_REDUCE_TASKS=$MAPRED_NUM_REDUCE_TASKS SPLIT_MINSIZE=$SPLIT_MINSIZE"
+ MAPRED_NUM_REDUCE_TASKS="$MAPRED_NUM_REDUCE_TASKS" SPLIT_MINSIZE="$SPLIT_MINSIZE"
+
+ start_time=$(date +%s%N)
+ hadoop jar sales-analyzer-1.0.jar org.itmo.MainJob $MAPRED_NUM_REDUCE_TASKS out /csv/*.csv
+ end_time=$(date +%s%N)
+
+ duration_ms=$(($((end_time - start_time)) / 1000000))
+
+ mkdir results/$dir
+
+ echo $duration_ms > results/$dir/time_elapsed_ms
+
+ hadoop fs -get out/part-r-00000 results/$dir/mapred.csv
+ sed -i '1i category,revenue,quantity' results/$dir/mapred.csv
+
+ hadoop fs -rm -r out
+ hadoop fs -rm -r mapred_out
+ done
+done
+
+#tail -f /dev/null
diff --git a/sales-analyzer/build.gradle.kts b/sales-analyzer/build.gradle.kts
new file mode 100644
index 0000000..ed1da67
--- /dev/null
+++ b/sales-analyzer/build.gradle.kts
@@ -0,0 +1,21 @@
+plugins {
+ id("java")
+}
+
+java {
+ toolchain {
+ languageVersion.set(JavaLanguageVersion.of(17))
+ }
+}
+
+group = "org.itmo"
+version = "1.0"
+
+repositories {
+ mavenCentral()
+}
+
+dependencies {
+ compileOnly("org.apache.hadoop:hadoop-common:3.4.2")
+ compileOnly("org.apache.hadoop:hadoop-mapreduce-client-core:3.4.2")
+}
diff --git a/sales-analyzer/gradle/wrapper/gradle-wrapper.jar b/sales-analyzer/gradle/wrapper/gradle-wrapper.jar
new file mode 100644
index 0000000..249e583
Binary files /dev/null and b/sales-analyzer/gradle/wrapper/gradle-wrapper.jar differ
diff --git a/sales-analyzer/gradle/wrapper/gradle-wrapper.properties b/sales-analyzer/gradle/wrapper/gradle-wrapper.properties
new file mode 100644
index 0000000..628ccde
--- /dev/null
+++ b/sales-analyzer/gradle/wrapper/gradle-wrapper.properties
@@ -0,0 +1,6 @@
+#Sat Dec 13 15:56:51 MSK 2025
+distributionBase=GRADLE_USER_HOME
+distributionPath=wrapper/dists
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.8-bin.zip
+zipStoreBase=GRADLE_USER_HOME
+zipStorePath=wrapper/dists
diff --git a/sales-analyzer/gradlew b/sales-analyzer/gradlew
new file mode 100755
index 0000000..1b6c787
--- /dev/null
+++ b/sales-analyzer/gradlew
@@ -0,0 +1,234 @@
+#!/bin/sh
+
+#
+# Copyright © 2015-2021 the original authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+##############################################################################
+#
+# Gradle start up script for POSIX generated by Gradle.
+#
+# Important for running:
+#
+# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is
+# noncompliant, but you have some other compliant shell such as ksh or
+# bash, then to run this script, type that shell name before the whole
+# command line, like:
+#
+# ksh Gradle
+#
+# Busybox and similar reduced shells will NOT work, because this script
+# requires all of these POSIX shell features:
+# * functions;
+# * expansions «$var», «${var}», «${var:-default}», «${var+SET}»,
+# «${var#prefix}», «${var%suffix}», and «$( cmd )»;
+# * compound commands having a testable exit status, especially «case»;
+# * various built-in commands including «command», «set», and «ulimit».
+#
+# Important for patching:
+#
+# (2) This script targets any POSIX shell, so it avoids extensions provided
+# by Bash, Ksh, etc; in particular arrays are avoided.
+#
+# The "traditional" practice of packing multiple parameters into a
+# space-separated string is a well documented source of bugs and security
+# problems, so this is (mostly) avoided, by progressively accumulating
+# options in "$@", and eventually passing that to Java.
+#
+# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS,
+# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly;
+# see the in-line comments for details.
+#
+# There are tweaks for specific operating systems such as AIX, CygWin,
+# Darwin, MinGW, and NonStop.
+#
+# (3) This script is generated from the Groovy template
+# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt
+# within the Gradle project.
+#
+# You can find Gradle at https://github.com/gradle/gradle/.
+#
+##############################################################################
+
+# Attempt to set APP_HOME
+
+# Resolve links: $0 may be a link
+app_path=$0
+
+# Need this for daisy-chained symlinks.
+while
+ APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path
+ [ -h "$app_path" ]
+do
+ ls=$( ls -ld "$app_path" )
+ link=${ls#*' -> '}
+ case $link in #(
+ /*) app_path=$link ;; #(
+ *) app_path=$APP_HOME$link ;;
+ esac
+done
+
+APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit
+
+APP_NAME="Gradle"
+APP_BASE_NAME=${0##*/}
+
+# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"'
+
+# Use the maximum available, or set MAX_FD != -1 to use that value.
+MAX_FD=maximum
+
+warn () {
+ echo "$*"
+} >&2
+
+die () {
+ echo
+ echo "$*"
+ echo
+ exit 1
+} >&2
+
+# OS specific support (must be 'true' or 'false').
+cygwin=false
+msys=false
+darwin=false
+nonstop=false
+case "$( uname )" in #(
+ CYGWIN* ) cygwin=true ;; #(
+ Darwin* ) darwin=true ;; #(
+ MSYS* | MINGW* ) msys=true ;; #(
+ NONSTOP* ) nonstop=true ;;
+esac
+
+CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
+
+
+# Determine the Java command to use to start the JVM.
+if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD=$JAVA_HOME/jre/sh/java
+ else
+ JAVACMD=$JAVA_HOME/bin/java
+ fi
+ if [ ! -x "$JAVACMD" ] ; then
+ die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+ fi
+else
+ JAVACMD=java
+ which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation."
+fi
+
+# Increase the maximum file descriptors if we can.
+if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then
+ case $MAX_FD in #(
+ max*)
+ MAX_FD=$( ulimit -H -n ) ||
+ warn "Could not query maximum file descriptor limit"
+ esac
+ case $MAX_FD in #(
+ '' | soft) :;; #(
+ *)
+ ulimit -n "$MAX_FD" ||
+ warn "Could not set maximum file descriptor limit to $MAX_FD"
+ esac
+fi
+
+# Collect all arguments for the java command, stacking in reverse order:
+# * args from the command line
+# * the main class name
+# * -classpath
+# * -D...appname settings
+# * --module-path (only if needed)
+# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables.
+
+# For Cygwin or MSYS, switch paths to Windows format before running java
+if "$cygwin" || "$msys" ; then
+ APP_HOME=$( cygpath --path --mixed "$APP_HOME" )
+ CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" )
+
+ JAVACMD=$( cygpath --unix "$JAVACMD" )
+
+ # Now convert the arguments - kludge to limit ourselves to /bin/sh
+ for arg do
+ if
+ case $arg in #(
+ -*) false ;; # don't mess with options #(
+ /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath
+ [ -e "$t" ] ;; #(
+ *) false ;;
+ esac
+ then
+ arg=$( cygpath --path --ignore --mixed "$arg" )
+ fi
+ # Roll the args list around exactly as many times as the number of
+ # args, so each arg winds up back in the position where it started, but
+ # possibly modified.
+ #
+ # NB: a `for` loop captures its iteration list before it begins, so
+ # changing the positional parameters here affects neither the number of
+ # iterations, nor the values presented in `arg`.
+ shift # remove old arg
+ set -- "$@" "$arg" # push replacement arg
+ done
+fi
+
+# Collect all arguments for the java command;
+# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of
+# shell script including quotes and variable substitutions, so put them in
+# double quotes to make sure that they get re-expanded; and
+# * put everything else in single quotes, so that it's not re-expanded.
+
+set -- \
+ "-Dorg.gradle.appname=$APP_BASE_NAME" \
+ -classpath "$CLASSPATH" \
+ org.gradle.wrapper.GradleWrapperMain \
+ "$@"
+
+# Use "xargs" to parse quoted args.
+#
+# With -n1 it outputs one arg per line, with the quotes and backslashes removed.
+#
+# In Bash we could simply go:
+#
+# readarray ARGS < <( xargs -n1 <<<"$var" ) &&
+# set -- "${ARGS[@]}" "$@"
+#
+# but POSIX shell has neither arrays nor command substitution, so instead we
+# post-process each arg (as a line of input to sed) to backslash-escape any
+# character that might be a shell metacharacter, then use eval to reverse
+# that process (while maintaining the separation between arguments), and wrap
+# the whole thing up as a single "set" statement.
+#
+# This will of course break if any of these variables contains a newline or
+# an unmatched quote.
+#
+
+eval "set -- $(
+ printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" |
+ xargs -n1 |
+ sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' |
+ tr '\n' ' '
+ )" '"$@"'
+
+exec "$JAVACMD" "$@"
diff --git a/sales-analyzer/gradlew.bat b/sales-analyzer/gradlew.bat
new file mode 100644
index 0000000..ac1b06f
--- /dev/null
+++ b/sales-analyzer/gradlew.bat
@@ -0,0 +1,89 @@
+@rem
+@rem Copyright 2015 the original author or authors.
+@rem
+@rem Licensed under the Apache License, Version 2.0 (the "License");
+@rem you may not use this file except in compliance with the License.
+@rem You may obtain a copy of the License at
+@rem
+@rem https://www.apache.org/licenses/LICENSE-2.0
+@rem
+@rem Unless required by applicable law or agreed to in writing, software
+@rem distributed under the License is distributed on an "AS IS" BASIS,
+@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@rem See the License for the specific language governing permissions and
+@rem limitations under the License.
+@rem
+
+@if "%DEBUG%" == "" @echo off
+@rem ##########################################################################
+@rem
+@rem Gradle startup script for Windows
+@rem
+@rem ##########################################################################
+
+@rem Set local scope for the variables with windows NT shell
+if "%OS%"=="Windows_NT" setlocal
+
+set DIRNAME=%~dp0
+if "%DIRNAME%" == "" set DIRNAME=.
+set APP_BASE_NAME=%~n0
+set APP_HOME=%DIRNAME%
+
+@rem Resolve any "." and ".." in APP_HOME to make it shorter.
+for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
+
+@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
+set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
+
+@rem Find java.exe
+if defined JAVA_HOME goto findJavaFromJavaHome
+
+set JAVA_EXE=java.exe
+%JAVA_EXE% -version >NUL 2>&1
+if "%ERRORLEVEL%" == "0" goto execute
+
+echo.
+echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+echo.
+echo Please set the JAVA_HOME variable in your environment to match the
+echo location of your Java installation.
+
+goto fail
+
+:findJavaFromJavaHome
+set JAVA_HOME=%JAVA_HOME:"=%
+set JAVA_EXE=%JAVA_HOME%/bin/java.exe
+
+if exist "%JAVA_EXE%" goto execute
+
+echo.
+echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
+echo.
+echo Please set the JAVA_HOME variable in your environment to match the
+echo location of your Java installation.
+
+goto fail
+
+:execute
+@rem Setup the command line
+
+set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
+
+
+@rem Execute Gradle
+"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
+
+:end
+@rem End local scope for the variables with windows NT shell
+if "%ERRORLEVEL%"=="0" goto mainEnd
+
+:fail
+rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
+rem the _cmd.exe /c_ return code!
+if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
+exit /b 1
+
+:mainEnd
+if "%OS%"=="Windows_NT" endlocal
+
+:omega
diff --git a/sales-analyzer/settings.gradle.kts b/sales-analyzer/settings.gradle.kts
new file mode 100644
index 0000000..9215df4
--- /dev/null
+++ b/sales-analyzer/settings.gradle.kts
@@ -0,0 +1 @@
+rootProject.name = "sales-analyzer"
diff --git a/sales-analyzer/src/main/java/org/itmo/MainJob.java b/sales-analyzer/src/main/java/org/itmo/MainJob.java
new file mode 100644
index 0000000..5e69f53
--- /dev/null
+++ b/sales-analyzer/src/main/java/org/itmo/MainJob.java
@@ -0,0 +1,34 @@
+package org.itmo;
+
+import org.apache.hadoop.conf.Configuration;
+import org.itmo.mapreduce.SalesJob;
+import org.itmo.sort.SortJob;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class MainJob {
+ private static final String SPLIT_MINSIZE_ENV_VAR = "SPLIT_MINSIZE";
+
+ public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
+ if (args.length < 3) {
+ System.err.println("usage: MainJob [...input_paths]");
+ System.exit(1);
+ }
+
+ Configuration conf = new Configuration();
+ conf.set("mapreduce.input.fileinputformat.split.minsize", System.getenv(SPLIT_MINSIZE_ENV_VAR));
+
+ int numReduceTasks = Integer.parseInt(args[0]);
+ String mapredOut = "mapred_out";
+ String sortOut = args[1];
+
+ int salesJobResult = SalesJob.run(conf, numReduceTasks, mapredOut, Arrays.copyOfRange(args, 2, args.length));
+ if (salesJobResult != 0) {
+ System.exit(salesJobResult);
+ }
+
+ int sortJobResult = SortJob.run(conf, sortOut, mapredOut);
+ System.exit(sortJobResult);
+ }
+}
diff --git a/sales-analyzer/src/main/java/org/itmo/mapreduce/SalesJob.java b/sales-analyzer/src/main/java/org/itmo/mapreduce/SalesJob.java
new file mode 100644
index 0000000..edfe5a9
--- /dev/null
+++ b/sales-analyzer/src/main/java/org/itmo/mapreduce/SalesJob.java
@@ -0,0 +1,35 @@
+package org.itmo.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import java.io.IOException;
+
+public class SalesJob {
+ public static int run(Configuration conf, int numReduceTasks, String outDir, String ...inPaths) throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = Job.getInstance(conf, "Sales analysis job");
+
+ job.setJarByClass(SalesJob.class);
+ job.setNumReduceTasks(numReduceTasks);
+
+ job.setMapperClass(SalesMapper.class);
+ job.setReducerClass(SalesReducer.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(SalesWritable.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(SalesWritable.class);
+
+ FileOutputFormat.setOutputPath(job, new Path(outDir));
+ for (String path : inPaths) {
+ FileInputFormat.addInputPath(job, new Path(path));
+ }
+
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+}
diff --git a/sales-analyzer/src/main/java/org/itmo/mapreduce/SalesMapper.java b/sales-analyzer/src/main/java/org/itmo/mapreduce/SalesMapper.java
new file mode 100644
index 0000000..299b170
--- /dev/null
+++ b/sales-analyzer/src/main/java/org/itmo/mapreduce/SalesMapper.java
@@ -0,0 +1,45 @@
+package org.itmo.mapreduce;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+
+public class SalesMapper extends Mapper {
+ private static final String HEADER = "transaction_id,product_id,category,price,quantity";
+ private static final int CATEGORY_INDEX = 2;
+ private static final int PRICE_INDEX = 3;
+ private static final int QUANTITY_INDEX = 4;
+
+ Text categoryWritable = new Text();
+ SalesWritable salesWritable = new SalesWritable();
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String csvEntry = value.toString();
+ if (csvEntry == null) {
+ return;
+ }
+ csvEntry = csvEntry.trim();
+
+ if (csvEntry.isBlank() || csvEntry.equals(HEADER)) {
+ return;
+ }
+
+ String[] tokens = csvEntry.split(",");
+ if (tokens.length < 5) {
+ return;
+ }
+
+ String category = tokens[CATEGORY_INDEX].trim();
+ double price = Double.parseDouble(tokens[PRICE_INDEX].trim());
+ long quantity = Long.parseLong(tokens[QUANTITY_INDEX].trim());
+
+ categoryWritable.set(category);
+ salesWritable.setRevenue(price * quantity);
+ salesWritable.setQuantity(quantity);
+
+ context.write(categoryWritable, salesWritable);
+ }
+}
diff --git a/sales-analyzer/src/main/java/org/itmo/mapreduce/SalesReducer.java b/sales-analyzer/src/main/java/org/itmo/mapreduce/SalesReducer.java
new file mode 100644
index 0000000..a2cd4c7
--- /dev/null
+++ b/sales-analyzer/src/main/java/org/itmo/mapreduce/SalesReducer.java
@@ -0,0 +1,26 @@
+package org.itmo.mapreduce;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+public class SalesReducer extends Reducer {
+ SalesWritable salesWritable = new SalesWritable();
+
+ @Override
+ protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
+ double ttlRevenue = 0.0;
+ long ttlQuantity = 0L;
+
+ for (var value : values) {
+ ttlRevenue += value.getRevenue();
+ ttlQuantity += value.getQuantity();
+ }
+
+ salesWritable.setRevenue(ttlRevenue);
+ salesWritable.setQuantity(ttlQuantity);
+
+ context.write(key, salesWritable);
+ }
+}
diff --git a/sales-analyzer/src/main/java/org/itmo/mapreduce/SalesWritable.java b/sales-analyzer/src/main/java/org/itmo/mapreduce/SalesWritable.java
new file mode 100644
index 0000000..a544cb8
--- /dev/null
+++ b/sales-analyzer/src/main/java/org/itmo/mapreduce/SalesWritable.java
@@ -0,0 +1,50 @@
+package org.itmo.mapreduce;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SalesWritable implements Writable {
+ private final DoubleWritable revenueWritable = new DoubleWritable();
+ private final LongWritable quantityWritable = new LongWritable();
+
+ public SalesWritable() {
+ }
+
+ public void setRevenue(double revenue) {
+ revenueWritable.set(revenue);
+ }
+
+ public void setQuantity(long quantity) {
+ quantityWritable.set(quantity);
+ }
+
+ public double getRevenue() {
+ return revenueWritable.get();
+ }
+
+ public long getQuantity() {
+ return quantityWritable.get();
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ revenueWritable.write(dataOutput);
+ quantityWritable.write(dataOutput);
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ revenueWritable.readFields(dataInput);
+ quantityWritable.readFields(dataInput);
+ }
+
+ @Override
+ public String toString() {
+ return "#%.2f#%d".formatted(revenueWritable.get(), quantityWritable.get());
+ }
+}
diff --git a/sales-analyzer/src/main/java/org/itmo/sort/DoubleWritableDescComparator.java b/sales-analyzer/src/main/java/org/itmo/sort/DoubleWritableDescComparator.java
new file mode 100644
index 0000000..4e1660e
--- /dev/null
+++ b/sales-analyzer/src/main/java/org/itmo/sort/DoubleWritableDescComparator.java
@@ -0,0 +1,18 @@
+package org.itmo.sort;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.WritableComparator;
+
+public class DoubleWritableDescComparator extends WritableComparator {
+ public DoubleWritableDescComparator() {
+ super(DoubleWritable.class);
+ }
+
+ @Override
+ public int compare(byte[] b1, int s1, int l1,
+ byte[] b2, int s2, int l2) {
+ double thisValue = readDouble(b1, s1);
+ double thatValue = readDouble(b2, s2);
+ return Double.compare(-1 * thisValue, -1 * thatValue);
+ }
+}
diff --git a/sales-analyzer/src/main/java/org/itmo/sort/SortJob.java b/sales-analyzer/src/main/java/org/itmo/sort/SortJob.java
new file mode 100644
index 0000000..27969e8
--- /dev/null
+++ b/sales-analyzer/src/main/java/org/itmo/sort/SortJob.java
@@ -0,0 +1,38 @@
+package org.itmo.sort;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import java.io.IOException;
+
+public class SortJob {
+ public static int run(Configuration conf, String outDir, String... inPaths) throws IOException, InterruptedException, ClassNotFoundException {
+ Job job = Job.getInstance(conf, "Sort job");
+
+ job.setJarByClass(SortJob.class);
+ job.setNumReduceTasks(1);
+
+ job.setMapperClass(SortMapper.class);
+ job.setReducerClass(SortReducer.class);
+
+ job.setMapOutputKeyClass(DoubleWritable.class);
+ job.setMapOutputValueClass(SortedSalesWritable.class);
+
+ job.setSortComparatorClass(DoubleWritableDescComparator.class);
+
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(SortedSalesWritable.class);
+
+ FileOutputFormat.setOutputPath(job, new Path(outDir));
+ for (String path : inPaths) {
+ FileInputFormat.addInputPath(job, new Path(path));
+ }
+
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+}
diff --git a/sales-analyzer/src/main/java/org/itmo/sort/SortMapper.java b/sales-analyzer/src/main/java/org/itmo/sort/SortMapper.java
new file mode 100644
index 0000000..ffcf78a
--- /dev/null
+++ b/sales-analyzer/src/main/java/org/itmo/sort/SortMapper.java
@@ -0,0 +1,33 @@
+package org.itmo.sort;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import java.io.IOException;
+
+public class SortMapper extends Mapper {
+ private final DoubleWritable revenueWritable = new DoubleWritable();
+ private final SortedSalesWritable sortedSalesWritable = new SortedSalesWritable();
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String line = value.toString();
+ if (line == null || line.isBlank()) {
+ return;
+ }
+
+ String[] tokens = line.split("#");
+ String category = tokens[0].trim();
+ double revenue = Double.parseDouble(tokens[1].trim());
+ long quantity = Long.parseLong(tokens[2].trim());
+
+ revenueWritable.set(revenue);
+ sortedSalesWritable.setCategory(category);
+ sortedSalesWritable.setRevenue(revenue);
+ sortedSalesWritable.setQuantity(quantity);
+
+ context.write(revenueWritable, sortedSalesWritable);
+ }
+}
diff --git a/sales-analyzer/src/main/java/org/itmo/sort/SortReducer.java b/sales-analyzer/src/main/java/org/itmo/sort/SortReducer.java
new file mode 100644
index 0000000..4671f5b
--- /dev/null
+++ b/sales-analyzer/src/main/java/org/itmo/sort/SortReducer.java
@@ -0,0 +1,17 @@
+package org.itmo.sort;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Reducer;
+
+import java.io.IOException;
+
+public class SortReducer extends Reducer {
+
+ @Override
+ protected void reduce(DoubleWritable key, Iterable values, Context context) throws IOException, InterruptedException {
+ for (var value : values) {
+ context.write(NullWritable.get(), value);
+ }
+ }
+}
diff --git a/sales-analyzer/src/main/java/org/itmo/sort/SortedSalesWritable.java b/sales-analyzer/src/main/java/org/itmo/sort/SortedSalesWritable.java
new file mode 100644
index 0000000..6ac8d87
--- /dev/null
+++ b/sales-analyzer/src/main/java/org/itmo/sort/SortedSalesWritable.java
@@ -0,0 +1,50 @@
+package org.itmo.sort;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SortedSalesWritable implements Writable {
+ private final Text categoryWritable = new Text();
+ private final DoubleWritable revenueWritable = new DoubleWritable();
+ private final LongWritable quantityWritable = new LongWritable();
+
+ public SortedSalesWritable() {
+ }
+
+ public void setCategory(String category) {
+ categoryWritable.set(category);
+ }
+
+ public void setRevenue(double revenue) {
+ revenueWritable.set(revenue);
+ }
+
+ public void setQuantity(long quantity) {
+ quantityWritable.set(quantity);
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ categoryWritable.write(dataOutput);
+ revenueWritable.write(dataOutput);
+ quantityWritable.write(dataOutput);
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ categoryWritable.readFields(dataInput);
+ revenueWritable.readFields(dataInput);
+ quantityWritable.readFields(dataInput);
+ }
+
+ @Override
+ public String toString() {
+ return "%s,%.2f,%d".formatted(categoryWritable.toString(), revenueWritable.get(), quantityWritable.get());
+ }
+}