diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..1fac4d5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,43 @@ +.gradle +build/ +!gradle/wrapper/gradle-wrapper.jar +!**/src/main/**/build/ +!**/src/test/**/build/ +.kotlin + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr +out/ +!**/src/main/**/out/ +!**/src/test/**/out/ + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache +bin/ +!**/src/main/**/bin/ +!**/src/test/**/bin/ + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/.name b/.idea/.name new file mode 100644 index 0000000..ba86dff --- /dev/null +++ b/.idea/.name @@ -0,0 +1 @@ +Solution \ No newline at end of file diff --git a/.idea/gradle.xml b/.idea/gradle.xml new file mode 100644 index 0000000..ce1c62c --- /dev/null +++ b/.idea/gradle.xml @@ -0,0 +1,16 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..0b54c2b --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,8 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..7eec1db --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..288b36b --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/README.md b/README.md index 948dc85..bcefb47 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/uyodabcP) ## Лабораторная работа: Реализация MapReduce для анализа данных о продажах с ипользованием HADOOP!!! # Цель работы @@ -45,3 +46,11 @@ # Результаты Результатом работы является сам код, файл с результатами и экспериментальные данные по быстродействию работы написанного кода при изменении числа worker-ов / частей, на которые разбивается файл + +# Полезные Web UI (на хосте) + +- **HDFS NameNode**: `http://localhost:9870` +- **HDFS DataNode**: `http://localhost:9864` +- **YARN ResourceManager**: `http://localhost:8088` +- **YARN NodeManager**: `http://localhost:8042` +- **MapReduce JobHistory**: `http://localhost:8188` diff --git a/Taskfile.yml b/Taskfile.yml new file mode 100644 index 0000000..11b2ef4 --- /dev/null +++ b/Taskfile.yml @@ -0,0 +1,29 @@ +version: 3 + +tasks: + up: + desc: "Start docker compose" + cmds: + - docker compose up -d + + down: + desc: "Stop docker compose" + cmds: + - docker compose down + + csv: + desc: "Upload into HDFS" + cmds: + - bash scripts/hdfs_put_csv.sh ./csv /input/sales + + run: + desc: "Run MapReduce job" + cmds: + - bash scripts/run_job.sh --input /input/sales --output /output/sales_result --reducers 2 --map-threads 4 + - bash scripts/fetch_result.sh --output /output/sales_result --local results/result.tsv + + default: + desc: "csv + run" + cmds: + - task csv + - task run diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..a29611d --- /dev/null +++ b/build.gradle @@ -0,0 +1,50 @@ +plugins { + id 'java' + id 'com.github.johnrengelman.shadow' version '8.1.1' +} + +group = 'ru.taymir.telecom' +version = '1.0-SNAPSHOT' + +repositories { + mavenCentral() +} + +dependencies { + implementation 'org.apache.hadoop:hadoop-common:3.2.1' + implementation 'org.apache.hadoop:hadoop-mapreduce-client-core:3.2.1' + implementation 'org.apache.hadoop:hadoop-mapreduce-client-jobclient:3.2.1' + + testImplementation platform('org.junit:junit-bom:5.10.0') + testImplementation 'org.junit.jupiter:junit-jupiter' + testRuntimeOnly 'org.junit.platform:junit-platform-launcher' +} + +java { + toolchain { + languageVersion = JavaLanguageVersion.of(8) + } +} + +tasks.withType(JavaCompile).configureEach { + sourceCompatibility = JavaVersion.VERSION_1_8 + targetCompatibility = JavaVersion.VERSION_1_8 + options.encoding = 'UTF-8' +} + +jar { + manifest { + attributes( + 'Main-Class': 'ru.taymir.telecom.sales.Driver' + ) + } +} + +shadowJar { + archiveClassifier.set('all') + mergeServiceFiles() +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/0.csv b/csv/0.csv similarity index 100% rename from 0.csv rename to csv/0.csv diff --git a/1.csv b/csv/1.csv similarity index 100% rename from 1.csv rename to csv/1.csv diff --git a/2.csv b/csv/2.csv similarity index 100% rename from 2.csv rename to csv/2.csv diff --git a/3.csv b/csv/3.csv similarity index 100% rename from 3.csv rename to csv/3.csv diff --git a/4.csv b/csv/4.csv similarity index 100% rename from 4.csv rename to csv/4.csv diff --git a/5.csv b/csv/5.csv similarity index 100% rename from 5.csv rename to csv/5.csv diff --git a/6.csv b/csv/6.csv similarity index 100% rename from 6.csv rename to csv/6.csv diff --git a/7.csv b/csv/7.csv similarity index 100% rename from 7.csv rename to csv/7.csv diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..7204380 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,76 @@ +services: + namenode: + image: bde2020/hadoop-namenode:2.0.0-hadoop3.2.1-java8 + container_name: hadoop-namenode + restart: unless-stopped + env_file: + - ./hadoop/hadoop.env + environment: + - CLUSTER_NAME=${HADOOP_CLUSTER_NAME:-lab3-hadoop} + ports: + - "9870:9870" + - "9000:9000" + volumes: + - namenode:/hadoop/dfs/name + + datanode: + image: bde2020/hadoop-datanode:2.0.0-hadoop3.2.1-java8 + container_name: hadoop-datanode + restart: unless-stopped + env_file: + - ./hadoop/hadoop.env + environment: + - SERVICE_PRECONDITION=namenode:9870 + ports: + - "9864:9864" + volumes: + - datanode:/hadoop/dfs/data + depends_on: + - namenode + + resourcemanager: + image: bde2020/hadoop-resourcemanager:2.0.0-hadoop3.2.1-java8 + container_name: hadoop-resourcemanager + restart: unless-stopped + env_file: + - ./hadoop/hadoop.env + environment: + - SERVICE_PRECONDITION=namenode:9000 namenode:9870 datanode:9864 + ports: + - "8088:8088" + depends_on: + - namenode + - datanode + + nodemanager: + image: bde2020/hadoop-nodemanager:2.0.0-hadoop3.2.1-java8 + container_name: hadoop-nodemanager + restart: unless-stopped + env_file: + - ./hadoop/hadoop.env + environment: + - SERVICE_PRECONDITION=namenode:9000 namenode:9870 datanode:9864 resourcemanager:8088 + ports: + - "8042:8042" + depends_on: + - resourcemanager + + historyserver: + image: bde2020/hadoop-historyserver:2.0.0-hadoop3.2.1-java8 + container_name: hadoop-historyserver + restart: unless-stopped + env_file: + - ./hadoop/hadoop.env + environment: + - SERVICE_PRECONDITION=namenode:9000 namenode:9870 datanode:9864 resourcemanager:8088 + ports: + - "8188:8188" + volumes: + - historyserver:/hadoop/yarn/timeline + depends_on: + - resourcemanager + +volumes: + namenode: + datanode: + historyserver: diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..249e583 Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..7f8cf76 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Sun Dec 14 22:42:04 MSK 2025 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew new file mode 100755 index 0000000..1b6c787 --- /dev/null +++ b/gradlew @@ -0,0 +1,234 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +# +# Gradle start up script for POSIX generated by Gradle. +# +# Important for running: +# +# (1) You need a POSIX-compliant shell to run this script. If your /bin/sh is +# noncompliant, but you have some other compliant shell such as ksh or +# bash, then to run this script, type that shell name before the whole +# command line, like: +# +# ksh Gradle +# +# Busybox and similar reduced shells will NOT work, because this script +# requires all of these POSIX shell features: +# * functions; +# * expansions «$var», «${var}», «${var:-default}», «${var+SET}», +# «${var#prefix}», «${var%suffix}», and «$( cmd )»; +# * compound commands having a testable exit status, especially «case»; +# * various built-in commands including «command», «set», and «ulimit». +# +# Important for patching: +# +# (2) This script targets any POSIX shell, so it avoids extensions provided +# by Bash, Ksh, etc; in particular arrays are avoided. +# +# The "traditional" practice of packing multiple parameters into a +# space-separated string is a well documented source of bugs and security +# problems, so this is (mostly) avoided, by progressively accumulating +# options in "$@", and eventually passing that to Java. +# +# Where the inherited environment variables (DEFAULT_JVM_OPTS, JAVA_OPTS, +# and GRADLE_OPTS) rely on word-splitting, this is performed explicitly; +# see the in-line comments for details. +# +# There are tweaks for specific operating systems such as AIX, CygWin, +# Darwin, MinGW, and NonStop. +# +# (3) This script is generated from the Groovy template +# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# within the Gradle project. +# +# You can find Gradle at https://github.com/gradle/gradle/. +# +############################################################################## + +# Attempt to set APP_HOME + +# Resolve links: $0 may be a link +app_path=$0 + +# Need this for daisy-chained symlinks. +while + APP_HOME=${app_path%"${app_path##*/}"} # leaves a trailing /; empty if no leading path + [ -h "$app_path" ] +do + ls=$( ls -ld "$app_path" ) + link=${ls#*' -> '} + case $link in #( + /*) app_path=$link ;; #( + *) app_path=$APP_HOME$link ;; + esac +done + +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit + +APP_NAME="Gradle" +APP_BASE_NAME=${0##*/} + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD=maximum + +warn () { + echo "$*" +} >&2 + +die () { + echo + echo "$*" + echo + exit 1 +} >&2 + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "$( uname )" in #( + CYGWIN* ) cygwin=true ;; #( + Darwin* ) darwin=true ;; #( + MSYS* | MINGW* ) msys=true ;; #( + NONSTOP* ) nonstop=true ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD=$JAVA_HOME/jre/sh/java + else + JAVACMD=$JAVA_HOME/bin/java + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD=java + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then + case $MAX_FD in #( + max*) + MAX_FD=$( ulimit -H -n ) || + warn "Could not query maximum file descriptor limit" + esac + case $MAX_FD in #( + '' | soft) :;; #( + *) + ulimit -n "$MAX_FD" || + warn "Could not set maximum file descriptor limit to $MAX_FD" + esac +fi + +# Collect all arguments for the java command, stacking in reverse order: +# * args from the command line +# * the main class name +# * -classpath +# * -D...appname settings +# * --module-path (only if needed) +# * DEFAULT_JVM_OPTS, JAVA_OPTS, and GRADLE_OPTS environment variables. + +# For Cygwin or MSYS, switch paths to Windows format before running java +if "$cygwin" || "$msys" ; then + APP_HOME=$( cygpath --path --mixed "$APP_HOME" ) + CLASSPATH=$( cygpath --path --mixed "$CLASSPATH" ) + + JAVACMD=$( cygpath --unix "$JAVACMD" ) + + # Now convert the arguments - kludge to limit ourselves to /bin/sh + for arg do + if + case $arg in #( + -*) false ;; # don't mess with options #( + /?*) t=${arg#/} t=/${t%%/*} # looks like a POSIX filepath + [ -e "$t" ] ;; #( + *) false ;; + esac + then + arg=$( cygpath --path --ignore --mixed "$arg" ) + fi + # Roll the args list around exactly as many times as the number of + # args, so each arg winds up back in the position where it started, but + # possibly modified. + # + # NB: a `for` loop captures its iteration list before it begins, so + # changing the positional parameters here affects neither the number of + # iterations, nor the values presented in `arg`. + shift # remove old arg + set -- "$@" "$arg" # push replacement arg + done +fi + +# Collect all arguments for the java command; +# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of +# shell script including quotes and variable substitutions, so put them in +# double quotes to make sure that they get re-expanded; and +# * put everything else in single quotes, so that it's not re-expanded. + +set -- \ + "-Dorg.gradle.appname=$APP_BASE_NAME" \ + -classpath "$CLASSPATH" \ + org.gradle.wrapper.GradleWrapperMain \ + "$@" + +# Use "xargs" to parse quoted args. +# +# With -n1 it outputs one arg per line, with the quotes and backslashes removed. +# +# In Bash we could simply go: +# +# readarray ARGS < <( xargs -n1 <<<"$var" ) && +# set -- "${ARGS[@]}" "$@" +# +# but POSIX shell has neither arrays nor command substitution, so instead we +# post-process each arg (as a line of input to sed) to backslash-escape any +# character that might be a shell metacharacter, then use eval to reverse +# that process (while maintaining the separation between arguments), and wrap +# the whole thing up as a single "set" statement. +# +# This will of course break if any of these variables contains a newline or +# an unmatched quote. +# + +eval "set -- $( + printf '%s\n' "$DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS" | + xargs -n1 | + sed ' s~[^-[:alnum:]+,./:=@_]~\\&~g; ' | + tr '\n' ' ' + )" '"$@"' + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000..107acd3 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/hadoop/hadoop.env b/hadoop/hadoop.env new file mode 100644 index 0000000..f058472 --- /dev/null +++ b/hadoop/hadoop.env @@ -0,0 +1,43 @@ +# Base Hadoop config +CORE_CONF_fs_defaultFS=hdfs://namenode:9000 +HDFS_CONF_dfs_replication=1 +HDFS_CONF_dfs_webhdfs_enabled=true + +# YARN +YARN_CONF_yarn_resourcemanager_hostname=resourcemanager +YARN_CONF_yarn_resourcemanager_address=resourcemanager:8032 +YARN_CONF_yarn_resourcemanager_scheduler_address=resourcemanager:8030 +# NOTE: bde2020 entrypoint encodes '-' as '___' and '.' as '_' in env var names. +YARN_CONF_yarn_resourcemanager_resource___tracker_address=resourcemanager:8031 +YARN_CONF_yarn_resourcemanager_admin_address=resourcemanager:8033 +YARN_CONF_yarn_resourcemanager_webapp_address=resourcemanager:8088 + +# Make sure NodeManager advertises some resources +YARN_CONF_yarn_nodemanager_resource_memory___mb=4096 +YARN_CONF_yarn_nodemanager_resource_cpu___vcores=2 + +# Scheduler allocations +YARN_CONF_yarn_scheduler_minimum___allocation___mb=512 +YARN_CONF_yarn_scheduler_maximum___allocation___mb=4096 +YARN_CONF_yarn_nodemanager_aux___services=mapreduce_shuffle +YARN_CONF_yarn_nodemanager_aux___services_mapreduce_shuffle_class=org.apache.hadoop.mapred.ShuffleHandler + +# MapReduce on YARN +MAPRED_CONF_mapreduce_framework_name=yarn +MAPRED_CONF_mapreduce_jobhistory_address=historyserver:10020 +MAPRED_CONF_mapreduce_jobhistory_webapp_address=historyserver:8188 + +YARN_CONF_yarn_application_classpath=/opt/hadoop-3.2.1/etc/hadoop,/opt/hadoop-3.2.1/share/hadoop/common/*,/opt/hadoop-3.2.1/share/hadoop/common/lib/*,/opt/hadoop-3.2.1/share/hadoop/hdfs/*,/opt/hadoop-3.2.1/share/hadoop/hdfs/lib/*,/opt/hadoop-3.2.1/share/hadoop/mapreduce/*,/opt/hadoop-3.2.1/share/hadoop/mapreduce/lib/*,/opt/hadoop-3.2.1/share/hadoop/yarn/*,/opt/hadoop-3.2.1/share/hadoop/yarn/lib/* +MAPRED_CONF_mapreduce_application_classpath=/opt/hadoop-3.2.1/etc/hadoop,/opt/hadoop-3.2.1/share/hadoop/common/*,/opt/hadoop-3.2.1/share/hadoop/common/lib/*,/opt/hadoop-3.2.1/share/hadoop/hdfs/*,/opt/hadoop-3.2.1/share/hadoop/hdfs/lib/*,/opt/hadoop-3.2.1/share/hadoop/mapreduce/*,/opt/hadoop-3.2.1/share/hadoop/mapreduce/lib/*,/opt/hadoop-3.2.1/share/hadoop/yarn/*,/opt/hadoop-3.2.1/share/hadoop/yarn/lib/* + +MAPRED_CONF_yarn_app_mapreduce_am_env=HADOOP_MAPRED_HOME=/opt/hadoop-3.2.1 +MAPRED_CONF_mapreduce_map_env=HADOOP_MAPRED_HOME=/opt/hadoop-3.2.1 +MAPRED_CONF_mapreduce_reduce_env=HADOOP_MAPRED_HOME=/opt/hadoop-3.2.1 + +MAPRED_CONF_yarn_app_mapreduce_am_resource_mb=512 +MAPRED_CONF_mapreduce_map_memory_mb=512 +MAPRED_CONF_mapreduce_reduce_memory_mb=512 + +HADOOP_ROOT_LOGGER=INFO,console + + diff --git a/results/result.tsv b/results/result.tsv new file mode 100644 index 0000000..c3ff670 --- /dev/null +++ b/results/result.tsv @@ -0,0 +1,20 @@ +clothing 4560302624.16 911487 +video games 4560108748.01 913326 +baby products 4541435808.03 907186 +beauty products 4533874789.00 906417 +gardening tools 4531881276.32 905841 +automotive 4529861769.61 904962 +music instruments 4512294923.00 902389 +furniture 4503987208.87 900244 +electronics 4497527083.38 903266 +pet supplies 4488742181.26 896724 +stationery 4481795362.83 898265 +home appliances 4473888813.45 895815 +sports equipment 4469388254.74 894287 +groceries 4466915682.26 895470 +footwear 4465575438.21 894424 +jewelry 4463824116.38 893980 +office equipment 4463565383.79 892370 +toys 4462454111.26 892741 +books 4457621289.17 890948 +health & wellness 4454083325.20 890475 diff --git a/scripts/bench.sh b/scripts/bench.sh new file mode 100644 index 0000000..ec75fe0 --- /dev/null +++ b/scripts/bench.sh @@ -0,0 +1,45 @@ +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" + +INPUT="/input/sales" +OUT_PREFIX="/output/bench" +SERVICE="${HADOOP_NAMENODE_SERVICE:-namenode}" +LOCAL_CSV="$ROOT_DIR/results/bench.csv" + +REDUCERS_LIST="${REDUCERS_LIST:-1 2 4}" +MAP_THREADS_LIST="${MAP_THREADS_LIST:-1 2 4 8}" + +mkdir -p "$(dirname "$LOCAL_CSV")" +if [[ ! -f "$LOCAL_CSV" ]]; then + echo "timestamp,reducers,map_threads,seconds,output" > "$LOCAL_CSV" +fi + +echo "Benchmarking..." +echo "reducers: $REDUCERS_LIST" +echo "map_threads: $MAP_THREADS_LIST" +echo "input: $INPUT" +echo "out_prefix: $OUT_PREFIX" +echo "results file: $LOCAL_CSV" + +(cd "$ROOT_DIR" && ./gradlew -q shadowJar) + +ts="$(date +%s)" + +for r in $REDUCERS_LIST; do + for t in $MAP_THREADS_LIST; do + out="${OUT_PREFIX}_${ts}_r${r}_t${t}" + start="$(date +%s)" + "$ROOT_DIR/scripts/run_job.sh" --no-build --input "$INPUT" --output "$out" --reducers "$r" --map-threads "$t" + end="$(date +%s)" + secs="$(( end - start ))" + echo "$(date -Iseconds),$r,$t,$secs,$out" >> "$LOCAL_CSV" + + docker compose exec -T "$SERVICE" bash -lc "'${HADOOP_HOME:-/opt/hadoop-3.2.1}/bin/hdfs' dfs -rm -r -f '$out' >/dev/null 2>&1 || true" + done +done + +echo "Done. See: $LOCAL_CSV" + + + diff --git a/scripts/fetch_result.sh b/scripts/fetch_result.sh new file mode 100644 index 0000000..8a80dce --- /dev/null +++ b/scripts/fetch_result.sh @@ -0,0 +1,40 @@ +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" + +SERVICE="${HADOOP_NAMENODE_SERVICE:-namenode}" +OUTPUT="/output/sales_result" +LOCAL="$ROOT_DIR/results/result.tsv" + +usage() { + cat <&2 +Usage: + $0 [--output ] [--local ] + +Defaults: + --output $OUTPUT + --local $LOCAL +EOF +} + +while (( $# > 0 )); do + case "$1" in + --output) OUTPUT="${2:-}"; shift 2 ;; + --local) LOCAL="${2:-}"; shift 2 ;; + -h|--help) usage; exit 0 ;; + *) echo "Unknown arg: $1" >&2; usage; exit 2 ;; + esac +done + +mkdir -p "$(dirname "$LOCAL")" + +PART="$OUTPUT/part-r-00000" +echo "Fetching: HDFS:$PART -> $LOCAL" + +docker compose exec -T "$SERVICE" bash -lc "'${HADOOP_HOME:-/opt/hadoop-3.2.1}/bin/hdfs' dfs -test -e '$PART'" +docker compose exec -T "$SERVICE" bash -lc "'${HADOOP_HOME:-/opt/hadoop-3.2.1}/bin/hdfs' dfs -cat '$PART'" > "$LOCAL" + +echo "Saved: $LOCAL" + + + diff --git a/scripts/hdfs_put_csv.sh b/scripts/hdfs_put_csv.sh new file mode 100644 index 0000000..d738e66 --- /dev/null +++ b/scripts/hdfs_put_csv.sh @@ -0,0 +1,56 @@ +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" + +LOCAL_DIR="${1:-$ROOT_DIR/csv}" +HDFS_DIR="${2:-/input/sales}" +SERVICE="${HADOOP_NAMENODE_SERVICE:-namenode}" + +if ! command -v docker >/dev/null 2>&1; then + echo "docker not found" >&2 + exit 127 +fi + +if [[ ! -d "$LOCAL_DIR" ]]; then + echo "Local dir not found: $LOCAL_DIR" >&2 + exit 2 +fi + +shopt -s nullglob +FILES=("$LOCAL_DIR"/*.csv) +shopt -u nullglob + +if (( ${#FILES[@]} == 0 )); then + echo "No .csv files found in: $LOCAL_DIR" >&2 + exit 2 +fi + +for f in "${FILES[@]}"; do + if head -n 1 "$f" | grep -q 'version https://git-lfs.github.com/spec/v1'; then + echo "Detected Git LFS pointer file (not real CSV): $f" >&2 + echo "Run: git lfs install && git lfs pull" >&2 + exit 3 + fi +done + +TMP_DIR="/tmp/csv_upload_$$" + +echo "Creating HDFS dir: $HDFS_DIR" +docker compose exec -T "$SERVICE" bash -lc "'${HADOOP_HOME:-/opt/hadoop-3.2.1}/bin/hdfs' dfs -mkdir -p '$HDFS_DIR'" + +echo "Uploading ${#FILES[@]} files from $LOCAL_DIR to HDFS:$HDFS_DIR" +docker compose exec -T "$SERVICE" bash -lc "rm -rf '$TMP_DIR' && mkdir -p '$TMP_DIR'" + +for f in "${FILES[@]}"; do + base="$(basename "$f")" + docker compose cp "$f" "$SERVICE:$TMP_DIR/$base" + docker compose exec -T "$SERVICE" bash -lc "'${HADOOP_HOME:-/opt/hadoop-3.2.1}/bin/hdfs' dfs -put -f '$TMP_DIR/$base' '$HDFS_DIR/'" +done + +docker compose exec -T "$SERVICE" bash -lc "rm -rf '$TMP_DIR'" + +echo "Done. HDFS listing:" +docker compose exec -T "$SERVICE" bash -lc "'${HADOOP_HOME:-/opt/hadoop-3.2.1}/bin/hdfs' dfs -ls '$HDFS_DIR' || true" + + + diff --git a/scripts/run_job.sh b/scripts/run_job.sh new file mode 100644 index 0000000..41bebe5 --- /dev/null +++ b/scripts/run_job.sh @@ -0,0 +1,77 @@ +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" + +SERVICE="${HADOOP_JOB_SERVICE:-resourcemanager}" +JAR_IN_CONTAINER="${HADOOP_JOB_JAR_PATH:-/tmp/sales-job.jar}" +HADOOP_BIN="${HADOOP_BIN:-${HADOOP_HOME:-/opt/hadoop-3.2.1}/bin/hadoop}" +HDFS_BIN="${HDFS_BIN:-${HADOOP_HOME:-/opt/hadoop-3.2.1}/bin/hdfs}" + +INPUT="/input/sales" +OUTPUT="/output/sales_result" +REDUCERS="2" +MAP_THREADS="4" +NO_BUILD="0" + +usage() { + cat <&2 +Usage: + $0 --input --output [--reducers N] [--map-threads N] [--no-build] + +Defaults: + --input $INPUT + --output $OUTPUT + --reducers $REDUCERS + --map-threads $MAP_THREADS +EOF +} + +while (( $# > 0 )); do + case "$1" in + --input) INPUT="${2:-}"; shift 2 ;; + --output) OUTPUT="${2:-}"; shift 2 ;; + --reducers) REDUCERS="${2:-}"; shift 2 ;; + --map-threads) MAP_THREADS="${2:-}"; shift 2 ;; + --no-build) NO_BUILD="1"; shift 1 ;; + -h|--help) usage; exit 0 ;; + *) echo "Unknown arg: $1" >&2; usage; exit 2 ;; + esac +done + +if [[ -z "$INPUT" || -z "$OUTPUT" ]]; then + usage + exit 2 +fi + +JAR_LOCAL="$ROOT_DIR/build/libs/Solution-1.0-SNAPSHOT-all.jar" + +if [[ "$NO_BUILD" != "1" ]]; then + (cd "$ROOT_DIR" && ./gradlew -q shadowJar) +fi + +if [[ ! -f "$JAR_LOCAL" ]]; then + echo "Jar not found: $JAR_LOCAL" >&2 + echo "Try: ./gradlew shadowJar" >&2 + exit 2 +fi + +echo "Copying jar into container: $SERVICE:$JAR_IN_CONTAINER" +docker compose cp "$JAR_LOCAL" "$SERVICE:$JAR_IN_CONTAINER" + +echo "Checking HDFS input exists: $INPUT" +if ! docker compose exec -T "$SERVICE" bash -lc "'$HDFS_BIN' dfs -test -e '$INPUT'"; then + echo "ERROR: HDFS input path does not exist: $INPUT" >&2 + echo "Hint: upload local csv/*.csv into HDFS first, e.g.:" >&2 + echo " bash scripts/hdfs_put_csv.sh ./csv $INPUT" >&2 + exit 4 +fi + +echo "Running Hadoop job on YARN..." +docker compose exec -T "$SERVICE" bash -lc \ + "'$HADOOP_BIN' jar '$JAR_IN_CONTAINER' \ + --input '$INPUT' --output '$OUTPUT' --reducers '$REDUCERS' --map-threads '$MAP_THREADS'" + +echo "Done. Output (HDFS): $OUTPUT" +echo "Tip: fetch with scripts/fetch_result.sh --output '$OUTPUT' --local results/result.tsv" + + diff --git a/settings.gradle b/settings.gradle new file mode 100644 index 0000000..6f4f39f --- /dev/null +++ b/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'Solution' \ No newline at end of file diff --git a/src/main/java/ru/taymir/telecom/Main.java b/src/main/java/ru/taymir/telecom/Main.java new file mode 100644 index 0000000..bbcea91 --- /dev/null +++ b/src/main/java/ru/taymir/telecom/Main.java @@ -0,0 +1,9 @@ +package ru.taymir.telecom; + +//TIP To Run code, press or +// click the icon in the gutter. +public class Main { + public static void main(String[] args) { + + } +} \ No newline at end of file diff --git a/src/main/java/ru/taymir/telecom/jobs/Aggregate.java b/src/main/java/ru/taymir/telecom/jobs/Aggregate.java new file mode 100644 index 0000000..5506816 --- /dev/null +++ b/src/main/java/ru/taymir/telecom/jobs/Aggregate.java @@ -0,0 +1,53 @@ +package ru.taymir.telecom.jobs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import ru.taymir.telecom.revenue.QuantityWritable; +import ru.taymir.telecom.sales.SalesMapper; +import ru.taymir.telecom.sales.SumReducer; + +public final class Aggregate { + private Aggregate() { + } + + public static Job configure( + Configuration conf, + Path input, + Path output, + int reducers, + int mapThreads + ) throws Exception + { + Job job = Job.getInstance(conf, "aggregate"); + job.setJarByClass(Aggregate.class); + + FileInputFormat.addInputPath(job, input); + FileOutputFormat.setOutputPath(job, output); + + job.setMapperClass(MultithreadedMapper.class); + MultithreadedMapper.setMapperClass(job, SalesMapper.class); + MultithreadedMapper.setNumberOfThreads(job, mapThreads); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(QuantityWritable.class); + + job.setCombinerClass(SumReducer.class); + job.setReducerClass(SumReducer.class); + job.setNumReduceTasks(reducers); + + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(QuantityWritable.class); + + return job; + } +} + + + diff --git a/src/main/java/ru/taymir/telecom/jobs/Sort.java b/src/main/java/ru/taymir/telecom/jobs/Sort.java new file mode 100644 index 0000000..c6e65b9 --- /dev/null +++ b/src/main/java/ru/taymir/telecom/jobs/Sort.java @@ -0,0 +1,91 @@ +package ru.taymir.telecom.jobs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import ru.taymir.telecom.revenue.CategoryKey; +import ru.taymir.telecom.revenue.QuantityWritable; + +import java.io.IOException; + +public final class Sort { + private Sort() { + } + + public static class SortMapper extends Mapper { + private final CategoryKey outKey = new CategoryKey(); + private final QuantityWritable outVal = new QuantityWritable(); + + @Override + protected void map(Text category, QuantityWritable agg, Context context) + throws IOException, InterruptedException { + outKey.set(agg.getRevenueCents(), category.toString()); + outVal.set(agg.getRevenueCents(), agg.getQuantity()); + context.write(outKey, outVal); + } + } + + public static class SortReducer extends Reducer { + private static String formatCents(long cents) { + boolean neg = cents < 0; + long abs = Math.abs(cents); + long major = abs / 100; + long minor = abs % 100; + String s = major + "." + (minor < 10 ? "0" + minor : minor); + if (neg) { + return "-" + s; + } + return s; + } + + private final Text outLine = new Text(); + + @Override + protected void reduce(CategoryKey key, Iterable values, Context context) + throws IOException, InterruptedException { + long revenueCents = 0L; + long quantity = 0L; + for (QuantityWritable v : values) { + revenueCents = v.getRevenueCents(); + quantity = v.getQuantity(); + break; + } + + String category = key.getCategory().toString(); + outLine.set(category + "\t" + formatCents(revenueCents) + "\t" + quantity); + context.write(outLine, NullWritable.get()); + } + } + + public static Job configure(Configuration conf, Path input, Path output) throws Exception { + Job job = Job.getInstance(conf, "sales-sort"); + job.setJarByClass(Sort.class); + + job.setInputFormatClass(SequenceFileInputFormat.class); + SequenceFileInputFormat.addInputPath(job, input); + FileOutputFormat.setOutputPath(job, output); + + job.setMapperClass(SortMapper.class); + job.setMapOutputKeyClass(CategoryKey.class); + job.setMapOutputValueClass(QuantityWritable.class); + + job.setNumReduceTasks(1); + job.setReducerClass(SortReducer.class); + + job.setOutputFormatClass(TextOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(NullWritable.class); + + return job; + } +} + + + diff --git a/src/main/java/ru/taymir/telecom/revenue/CategoryKey.java b/src/main/java/ru/taymir/telecom/revenue/CategoryKey.java new file mode 100644 index 0000000..0701720 --- /dev/null +++ b/src/main/java/ru/taymir/telecom/revenue/CategoryKey.java @@ -0,0 +1,84 @@ +package ru.taymir.telecom.revenue; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Сортирует ключи для джобы. revenue DESC, category ASC. + */ +public class CategoryKey implements WritableComparable { + private long revenueCents; + private final Text category = new Text(); + + public CategoryKey() {} + + public CategoryKey(long revenueCents, String category) { + this.revenueCents = revenueCents; + this.category.set(category); + } + + public long getRevenueCents() { + return revenueCents; + } + + public Text getCategory() { + return category; + } + + public void set(long revenueCents, String category) { + this.revenueCents = revenueCents; + this.category.set(category); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(revenueCents); + category.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + revenueCents = in.readLong(); + category.readFields(in); + } + + @Override + public int compareTo(CategoryKey other) { + int cmp = -1 * Long.compare(this.revenueCents, other.revenueCents); + if (cmp != 0) { + return cmp; + } + return this.category.compareTo(other.category); + } + + @Override + public int hashCode() { + int result = Long.hashCode(revenueCents); + result = 31 * result + category.hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof CategoryKey)) { + return false; + } + CategoryKey other = (CategoryKey) obj; + return revenueCents == other.revenueCents && category.equals(other.category); + } + + @Override + public String toString() { + return revenueCents + "\t" + category; + } +} + + + diff --git a/src/main/java/ru/taymir/telecom/revenue/QuantityWritable.java b/src/main/java/ru/taymir/telecom/revenue/QuantityWritable.java new file mode 100644 index 0000000..10a00b9 --- /dev/null +++ b/src/main/java/ru/taymir/telecom/revenue/QuantityWritable.java @@ -0,0 +1,53 @@ +package ru.taymir.telecom.revenue; + +import org.apache.hadoop.io.Writable; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class QuantityWritable implements Writable { + private long revenueCents; + private long quantity; + + public QuantityWritable() { + } + + public QuantityWritable(long revenueCents, long quantity) { + this.revenueCents = revenueCents; + this.quantity = quantity; + } + + public long getRevenueCents() { + return revenueCents; + } + + public long getQuantity() { + return quantity; + } + + public void set(long revenueCents, long quantity) { + this.revenueCents = revenueCents; + this.quantity = quantity; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(revenueCents); + out.writeLong(quantity); + } + + @Override + public void readFields(DataInput in) throws IOException { + revenueCents = in.readLong(); + quantity = in.readLong(); + } + + @Override + public String toString() { + return revenueCents + "\t" + quantity; + } +} + + + diff --git a/src/main/java/ru/taymir/telecom/sales/Driver.java b/src/main/java/ru/taymir/telecom/sales/Driver.java new file mode 100644 index 0000000..6d502d7 --- /dev/null +++ b/src/main/java/ru/taymir/telecom/sales/Driver.java @@ -0,0 +1,119 @@ +package ru.taymir.telecom.sales; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import ru.taymir.telecom.jobs.Aggregate; +import ru.taymir.telecom.jobs.Sort; + +import java.time.Instant; + +public class Driver extends Configured implements Tool { + + public static void main(String[] args) throws Exception { + int exit = ToolRunner.run(new Configuration(), new Driver(), args); + System.exit(exit); + } + + @Override + public int run(String[] args) throws Exception { + Args a = Args.parse(args); + + Configuration conf = getConf(); + FileSystem fs = FileSystem.get(conf); + + Path input = new Path(a.input); + Path finalOutput = new Path(a.output); + + String tmpName = a.output.replaceAll("/+$", "") + "_agg_" + Instant.now().toEpochMilli(); + Path tmpAgg = new Path(tmpName); + + if (fs.exists(tmpAgg)) { + fs.delete(tmpAgg, true); + } + if (fs.exists(finalOutput)) { + fs.delete(finalOutput, true); + } + + Job jobAggregate = Aggregate.configure(conf, input, tmpAgg, a.reducers, a.mapThreads); + if (!jobAggregate.waitForCompletion(true)) { + return 1; + } + + Job jobSort = Sort.configure(conf, tmpAgg, finalOutput); + boolean ok = jobSort.waitForCompletion(true); + + try { + fs.delete(tmpAgg, true); + } catch (Exception e) {} + + if (ok) { + return 0; + } + return 2; + } + + private static final class Args { + final String input; + final String output; + final int reducers; + final int mapThreads; + + private Args(String input, String output, int reducers, int mapThreads) { + this.input = input; + this.output = output; + this.reducers = reducers; + this.mapThreads = mapThreads; + } + + static Args parse(String[] args) { + String input = null; + String output = null; + int reducers = 2; + int mapThreads = 4; + + for (int i = 0; i < args.length; i++) { + String a = args[i]; + if ("--input".equals(a) && i + 1 < args.length) { + input = args[++i]; + } else if ("--output".equals(a) && i + 1 < args.length) { + output = args[++i]; + } else if ("--reducers".equals(a) && i + 1 < args.length) { + reducers = Integer.parseInt(args[++i]); + } else if ("--map-threads".equals(a) && i + 1 < args.length) { + mapThreads = Integer.parseInt(args[++i]); + } else if ("-h".equals(a) || "--help".equals(a)) { + usageAndExit(0); + } else { + usageAndExit(2); + } + } + + if (input == null || output == null) { + usageAndExit(2); + } + if (reducers < 1) { + reducers = 1; + } + if (mapThreads < 1) { + mapThreads = 1; + } + + return new Args(input, output, reducers, mapThreads); + } + + static void usageAndExit(int code) { + System.err.println("Usage: hadoop jar [ru.taymir.telecom.sales.Driver] " + + "--input --output " + + "[--reducers N] [--map-threads N]"); + System.exit(code); + } + } +} + + + diff --git a/src/main/java/ru/taymir/telecom/sales/SalesMapper.java b/src/main/java/ru/taymir/telecom/sales/SalesMapper.java new file mode 100644 index 0000000..0e32519 --- /dev/null +++ b/src/main/java/ru/taymir/telecom/sales/SalesMapper.java @@ -0,0 +1,77 @@ +package ru.taymir.telecom.sales; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import ru.taymir.telecom.revenue.QuantityWritable; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; + +/** + * Получает на вход файл со следующими полями: + * transaction_id, product_id, category, price, quantity + */ +public class SalesMapper extends Mapper { + private final Text outKey = new Text(); + private final QuantityWritable outVal = new QuantityWritable(); + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String line = value.toString().trim(); + if (line.isEmpty()) { + return; + } + + if (line.startsWith("transaction_id,")) { + return; + } + + String[] parts = line.split(",", -1); + if (parts.length < 5) { + return; + } + + String category = parts[2].trim(); + if (category.isEmpty()) { + return; + } + + String priceStr = parts[3].trim(); + String qtyStr = parts[4].trim(); + if (priceStr.isEmpty() || qtyStr.isEmpty()) { + return; + } + + long quantity; + try { + quantity = Long.parseLong(qtyStr); + } catch (NumberFormatException e) { + return; + } + if (quantity <= 0) { + return; + } + + long priceCents; + try { + BigDecimal price = new BigDecimal(priceStr); + priceCents = price + .movePointRight(2) + .setScale(0, RoundingMode.HALF_UP) + .longValueExact(); + } catch (Exception e) { + return; + } + + long revenueCents = Math.multiplyExact(priceCents, quantity); + + outKey.set(category); + outVal.set(revenueCents, quantity); + context.write(outKey, outVal); + } +} + + + diff --git a/src/main/java/ru/taymir/telecom/sales/SumReducer.java b/src/main/java/ru/taymir/telecom/sales/SumReducer.java new file mode 100644 index 0000000..fff1ddb --- /dev/null +++ b/src/main/java/ru/taymir/telecom/sales/SumReducer.java @@ -0,0 +1,30 @@ +package ru.taymir.telecom.sales; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; +import ru.taymir.telecom.revenue.QuantityWritable; + +import java.io.IOException; + +/** + * Суммирует revenueCents и quantity по категории. + */ +public class SumReducer extends Reducer { + private final QuantityWritable outVal = new QuantityWritable(); + + @Override + protected void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + long sumRevenue = 0L; + long sumQty = 0L; + for (QuantityWritable v : values) { + sumRevenue = Math.addExact(sumRevenue, v.getRevenueCents()); + sumQty = Math.addExact(sumQty, v.getQuantity()); + } + outVal.set(sumRevenue, sumQty); + context.write(key, outVal); + } +} + + +