diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9de0e37 --- /dev/null +++ b/.gitignore @@ -0,0 +1,90 @@ +############################## +## Java +############################## +.mtj.tmp/ +*.class +*.jar +*.war +*.ear +*.nar +hs_err_pid* +replay_pid* + +############################## +## Maven +############################## +target/ +pom.xml.tag +pom.xml.releaseBackup +pom.xml.versionsBackup +pom.xml.next +pom.xml.bak +release.properties +dependency-reduced-pom.xml +buildNumber.properties +.mvn/timing.properties +.mvn/wrapper/maven-wrapper.jar + +############################## +## Gradle +############################## +bin/ +build/ +.gradle +.gradletasknamecache +gradle-app.setting +!gradle-wrapper.jar + +############################## +## IntelliJ +############################## +out/ +.idea/ +.idea_modules/ +*.iml +*.ipr +*.iws + +############################## +## Eclipse +############################## +.settings/ +bin/ +tmp/ +.metadata +.classpath +.project +*.tmp +*.bak +*.swp +*~.nib +local.properties +.loadpath +.factorypath + +############################## +## NetBeans +############################## +nbproject/private/ +build/ +nbbuild/ +dist/ +nbdist/ +nbactions.xml +nb-configuration.xml + +############################## +## Visual Studio Code +############################## +.vscode/ +.code-workspace + +############################## +## OS X +############################## +.DS_Store + +############################## +## Miscellaneous +############################## +*.log diff --git a/README.md b/README.md index 948dc85..84b20f4 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/uyodabcP) ## Лабораторная работа: Реализация MapReduce для анализа данных о продажах с ипользованием HADOOP!!! # Цель работы diff --git a/artifacts/full_log.txt b/artifacts/full_log.txt new file mode 100644 index 0000000..e7db9b2 --- /dev/null +++ b/artifacts/full_log.txt @@ -0,0 +1,245 @@ +Starting cluster... +Setting up HDFS... +Deleted /sales_input +Uploaded files: +Found 8 items +-rw-r--r-- 1 hadoop supergroup 3406784 2025-12-16 06:21 /sales_input/0.csv +-rw-r--r-- 1 hadoop supergroup 7078520 2025-12-16 06:21 /sales_input/1.csv +-rw-r--r-- 1 hadoop supergroup 10737171 2025-12-16 06:21 /sales_input/2.csv +-rw-r--r-- 1 hadoop supergroup 14530705 2025-12-16 06:21 /sales_input/3.csv +-rw-r--r-- 1 hadoop supergroup 18299520 2025-12-16 06:21 /sales_input/4.csv +-rw-r--r-- 1 hadoop supergroup 22053240 2025-12-16 06:21 /sales_input/5.csv +-rw-r--r-- 1 hadoop supergroup 25790880 2025-12-16 06:21 /sales_input/6.csv +-rw-r--r-- 1 hadoop supergroup 29524261 2025-12-16 06:21 /sales_input/7.csv +Compiling... +[INFO] Scanning for projects... +[INFO] +[INFO] ---------------------< org.ifmo.app:lab3-dmfrpro >---------------------- +[INFO] Building lab3-dmfrpro 1.0-SNAPSHOT +[INFO] from pom.xml +[INFO] --------------------------------[ jar ]--------------------------------- +[INFO] +[INFO] --- clean:3.2.0:clean (default-clean) @ lab3-dmfrpro --- +[INFO] Deleting /home/dmfrpro/Assignments/parallel/lab3-dmfrpro/target +[INFO] +[INFO] --- resources:3.3.1:resources (default-resources) @ lab3-dmfrpro --- +[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent! +[INFO] skip non existing resourceDirectory /home/dmfrpro/Assignments/parallel/lab3-dmfrpro/src/main/resources +[INFO] +[INFO] --- compiler:3.8.1:compile (default-compile) @ lab3-dmfrpro --- +[INFO] Changes detected - recompiling the module! +[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent! +[INFO] Compiling 6 source files to /home/dmfrpro/Assignments/parallel/lab3-dmfrpro/target/classes +[INFO] +[INFO] --- resources:3.3.1:testResources (default-testResources) @ lab3-dmfrpro --- +[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent! +[INFO] skip non existing resourceDirectory /home/dmfrpro/Assignments/parallel/lab3-dmfrpro/src/test/resources +[INFO] +[INFO] --- compiler:3.8.1:testCompile (default-testCompile) @ lab3-dmfrpro --- +[INFO] No sources to compile +[INFO] +[INFO] --- surefire:3.2.5:test (default-test) @ lab3-dmfrpro --- +[INFO] No tests to run. +[INFO] +[INFO] --- jar:3.4.1:jar (default-jar) @ lab3-dmfrpro --- +[INFO] Building jar: /home/dmfrpro/Assignments/parallel/lab3-dmfrpro/target/lab3-dmfrpro-1.0-SNAPSHOT.jar +[INFO] +[INFO] --- assembly:3.7.1:single (default) @ lab3-dmfrpro --- +[INFO] Building jar: /home/dmfrpro/Assignments/parallel/lab3-dmfrpro/target/lab3-dmfrpro-1.0-SNAPSHOT-jar-with-dependencies.jar +[INFO] ------------------------------------------------------------------------ +[INFO] BUILD SUCCESS +[INFO] ------------------------------------------------------------------------ +[INFO] Total time: 11.772 s +[INFO] Finished at: 2025-12-16T09:21:36+03:00 +[INFO] ------------------------------------------------------------------------ +Running job... +Cleaning output directories... + +=== Starting Job 1: Calculate revenue per category === +2025-12-16 06:21:42 INFO DefaultNoHARMFailoverProxyProvider:64 - Connecting to ResourceManager at resourcemanager/172.19.0.4:8032 +2025-12-16 06:21:42 WARN JobResourceUploader:149 - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. +2025-12-16 06:21:42 INFO JobResourceUploader:907 - Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/hadoop/.staging/job_1765865564182_0003 +2025-12-16 06:21:43 INFO FileInputFormat:302 - Total input files to process : 8 +2025-12-16 06:21:44 INFO JobSubmitter:203 - number of splits:8 +2025-12-16 06:21:44 INFO JobSubmitter:299 - Submitting tokens for job: job_1765865564182_0003 +2025-12-16 06:21:44 INFO JobSubmitter:300 - Executing with tokens: [] +2025-12-16 06:21:44 INFO Configuration:2898 - resource-types.xml not found +2025-12-16 06:21:44 INFO ResourceUtils:476 - Unable to find 'resource-types.xml'. +2025-12-16 06:21:44 INFO YarnClientImpl:356 - Submitted application application_1765865564182_0003 +2025-12-16 06:21:44 INFO Job:1681 - The url to track the job: http://resourcemanager:8088/proxy/application_1765865564182_0003/ +2025-12-16 06:21:44 INFO Job:1726 - Running job: job_1765865564182_0003 +2025-12-16 06:21:52 INFO Job:1747 - Job job_1765865564182_0003 running in uber mode : false +2025-12-16 06:21:52 INFO Job:1754 - map 0% reduce 0% +2025-12-16 06:22:01 INFO Job:1754 - map 25% reduce 0% +2025-12-16 06:22:02 INFO Job:1754 - map 50% reduce 0% +2025-12-16 06:22:03 INFO Job:1754 - map 75% reduce 0% +2025-12-16 06:22:05 INFO Job:1754 - map 100% reduce 0% +2025-12-16 06:22:08 INFO Job:1754 - map 100% reduce 33% +2025-12-16 06:22:09 INFO Job:1754 - map 100% reduce 100% +2025-12-16 06:22:10 INFO Job:1765 - Job job_1765865564182_0003 completed successfully +2025-12-16 06:22:10 INFO Job:1772 - Counters: 56 + File System Counters + FILE: Number of bytes read=109453185 + FILE: Number of bytes written=222314427 + FILE: Number of read operations=0 + FILE: Number of large read operations=0 + FILE: Number of write operations=0 + HDFS: Number of bytes read=131421865 + HDFS: Number of bytes written=668 + HDFS: Number of read operations=39 + HDFS: Number of large read operations=0 + HDFS: Number of write operations=6 + HDFS: Number of bytes read erasure-coded=0 + Job Counters + Killed map tasks=1 + Launched map tasks=8 + Launched reduce tasks=3 + Rack-local map tasks=8 + Total time spent by all maps in occupied slots (ms)=30748 + Total time spent by all reduces in occupied slots (ms)=10511 + Total time spent by all map tasks (ms)=30748 + Total time spent by all reduce tasks (ms)=10511 + Total vcore-milliseconds taken by all map tasks=30748 + Total vcore-milliseconds taken by all reduce tasks=10511 + Total megabyte-milliseconds taken by all map tasks=31485952 + Total megabyte-milliseconds taken by all reduce tasks=10763264 + Map-Reduce Framework + Map input records=3600008 + Map output records=3600000 + Map output bytes=102253167 + Map output materialized bytes=109453311 + Input split bytes=784 + Combine input records=0 + Combine output records=0 + Reduce input groups=20 + Reduce shuffle bytes=109453311 + Reduce input records=3600000 + Reduce output records=20 + Spilled Records=7200000 + Shuffled Maps =24 + Failed Shuffles=0 + Merged Map outputs=24 + GC time elapsed (ms)=1669 + CPU time spent (ms)=39030 + Physical memory (bytes) snapshot=5830852608 + Virtual memory (bytes) snapshot=29852155904 + Total committed heap usage (bytes)=7422345216 + Peak Map Physical memory (bytes)=637300736 + Peak Map Virtual memory (bytes)=2714058752 + Peak Reduce Physical memory (bytes)=418201600 + Peak Reduce Virtual memory (bytes)=2722541568 + Shuffle Errors + BAD_ID=0 + CONNECTION=0 + IO_ERROR=0 + WRONG_LENGTH=0 + WRONG_MAP=0 + WRONG_REDUCE=0 + File Input Format Counters + Bytes Read=131421081 + File Output Format Counters + Bytes Written=668 + org.ifmo.app.SalesMapper$Counter + HEADER_SKIPPED=8 + +=== Starting Job 2: Sort by revenue (descending) === +2025-12-16 06:22:10 INFO DefaultNoHARMFailoverProxyProvider:64 - Connecting to ResourceManager at resourcemanager/172.19.0.4:8032 +2025-12-16 06:22:10 WARN JobResourceUploader:149 - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. +2025-12-16 06:22:10 INFO JobResourceUploader:907 - Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/hadoop/.staging/job_1765865564182_0004 +2025-12-16 06:22:10 INFO FileInputFormat:302 - Total input files to process : 3 +2025-12-16 06:22:11 INFO JobSubmitter:203 - number of splits:3 +2025-12-16 06:22:11 INFO JobSubmitter:299 - Submitting tokens for job: job_1765865564182_0004 +2025-12-16 06:22:11 INFO JobSubmitter:300 - Executing with tokens: [] +2025-12-16 06:22:12 INFO YarnClientImpl:356 - Submitted application application_1765865564182_0004 +2025-12-16 06:22:12 INFO Job:1681 - The url to track the job: http://resourcemanager:8088/proxy/application_1765865564182_0004/ +2025-12-16 06:22:12 INFO Job:1726 - Running job: job_1765865564182_0004 +2025-12-16 06:22:23 INFO Job:1747 - Job job_1765865564182_0004 running in uber mode : false +2025-12-16 06:22:23 INFO Job:1754 - map 0% reduce 0% +2025-12-16 06:22:30 INFO Job:1754 - map 100% reduce 0% +2025-12-16 06:22:35 INFO Job:1754 - map 100% reduce 100% +2025-12-16 06:22:35 INFO Job:1765 - Job job_1765865564182_0004 completed successfully +2025-12-16 06:22:35 INFO Job:1772 - Counters: 54 + File System Counters + FILE: Number of bytes read=594 + FILE: Number of bytes written=1241205 + FILE: Number of read operations=0 + FILE: Number of large read operations=0 + FILE: Number of write operations=0 + HDFS: Number of bytes read=980 + HDFS: Number of bytes written=668 + HDFS: Number of read operations=14 + HDFS: Number of large read operations=0 + HDFS: Number of write operations=2 + HDFS: Number of bytes read erasure-coded=0 + Job Counters + Launched map tasks=3 + Launched reduce tasks=1 + Rack-local map tasks=3 + Total time spent by all maps in occupied slots (ms)=9841 + Total time spent by all reduces in occupied slots (ms)=2248 + Total time spent by all map tasks (ms)=9841 + Total time spent by all reduce tasks (ms)=2248 + Total vcore-milliseconds taken by all map tasks=9841 + Total vcore-milliseconds taken by all reduce tasks=2248 + Total megabyte-milliseconds taken by all map tasks=10077184 + Total megabyte-milliseconds taken by all reduce tasks=2301952 + Map-Reduce Framework + Map input records=20 + Map output records=20 + Map output bytes=548 + Map output materialized bytes=606 + Input split bytes=312 + Combine input records=0 + Combine output records=0 + Reduce input groups=20 + Reduce shuffle bytes=606 + Reduce input records=20 + Reduce output records=20 + Spilled Records=40 + Shuffled Maps =3 + Failed Shuffles=0 + Merged Map outputs=3 + GC time elapsed (ms)=263 + CPU time spent (ms)=2000 + Physical memory (bytes) snapshot=1200467968 + Virtual memory (bytes) snapshot=10841509888 + Total committed heap usage (bytes)=1513095168 + Peak Map Physical memory (bytes)=296411136 + Peak Map Virtual memory (bytes)=2709536768 + Peak Reduce Physical memory (bytes)=320827392 + Peak Reduce Virtual memory (bytes)=2714923008 + Shuffle Errors + BAD_ID=0 + CONNECTION=0 + IO_ERROR=0 + WRONG_LENGTH=0 + WRONG_MAP=0 + WRONG_REDUCE=0 + File Input Format Counters + Bytes Read=668 + File Output Format Counters + Bytes Written=668 + +=== Job Complete === + +=== RESULTS === +clothing 4560302171.99 911487 +video games 4560108307.50 913326 +baby products 4541435362.25 907186 +beauty products 4533874327.85 906417 +gardening tools 4531880837.74 905841 +automotive 4529861310.74 904962 +music instruments 4512294466.14 902389 +furniture 4503986763.16 900244 +electronics 4497526631.04 903266 +pet supplies 4488741730.38 896724 +stationery 4481794912.39 898265 +home appliances 4473888361.73 895815 +sports equipment 4469387812.34 894287 +groceries 4466915230.97 895470 +footwear 4465574983.36 894424 +jewelry 4463823670.79 893980 +office equipment 4463564947.38 892370 +toys 4462453654.12 892741 +books 4457620825.95 890948 +health & wellness 4454082892.49 890475 diff --git a/config b/config new file mode 100644 index 0000000..2236164 --- /dev/null +++ b/config @@ -0,0 +1,26 @@ +CORE-SITE.XML_fs.default.name=hdfs://namenode +CORE-SITE.XML_fs.defaultFS=hdfs://namenode +HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020 +HDFS-SITE.XML_dfs.replication=1 +MAPRED-SITE.XML_mapreduce.framework.name=yarn +MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=/opt/hadoop +MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=/opt/hadoop +MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=/opt/hadoop +YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager +YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false +YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600 +YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false +YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=* +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=* +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings= +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..e5b5123 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,44 @@ +services: + namenode: + container_name: hadoop_namenode + image: apache/hadoop:3.4.1 + hostname: namenode + command: ["hdfs", "namenode"] + ports: + - 9870:9870 + env_file: + - ./config + environment: + ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name" + volumes: + - ./:/opt/hadoop-project + + datanode: + container_name: hadoop_datanode + image: apache/hadoop:3.4.1 + command: ["hdfs", "datanode"] + env_file: + - ./config + volumes: + - ./:/opt/hadoop-project + + resourcemanager: + container_name: hadoop_resourcemanager + image: apache/hadoop:3.4.1 + hostname: resourcemanager + command: ["yarn", "resourcemanager"] + ports: + - 8088:8088 + env_file: + - ./config + volumes: + - ./:/opt/hadoop-project + + nodemanager: + container_name: hadoop_nodemanager + image: apache/hadoop:3.4.1 + command: ["yarn", "nodemanager"] + env_file: + - ./config + volumes: + - ./:/opt/hadoop-project diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..bc3cd0c --- /dev/null +++ b/flake.lock @@ -0,0 +1,61 @@ +{ + "nodes": { + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1731533236, + "narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "11707dc2f618dd54ca8739b309ec4fc024de578b", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1765762245, + "narHash": "sha256-3iXM/zTqEskWtmZs3gqNiVtRTsEjYAedIaLL0mSBsrk=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "c8cfcd6ccd422e41cc631a0b73ed4d5a925c393d", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-25.11", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..92608a3 --- /dev/null +++ b/flake.nix @@ -0,0 +1,36 @@ +{ + description = "lab3-dmfrpro"; + + inputs = { + nixpkgs.url = "github:NixOS/nixpkgs/nixos-25.11"; + flake-utils.url = "github:numtide/flake-utils"; + }; + + outputs = { self, nixpkgs, flake-utils }: + flake-utils.lib.eachDefaultSystem (system: + let + pkgs = import nixpkgs { + inherit system; + config = { + allowUnfree = true; + permittedInsecurePackages = [ + "openjdk-8u472-b08" + "openjdk-8u472-b08-jre" + ]; + }; + }; + in + { + devShells.default = pkgs.mkShell { + buildInputs = with pkgs; [ + javaPackages.compiler.openjdk8 + maven + docker + docker-compose + ]; + + JAVA_HOME = "${pkgs.javaPackages.compiler.openjdk8.home}"; + MAVEN_OPTS = "-Dmaven.wagon.http.ssl.insecure=true -Dmaven.wagon.http.ssl.allowall=true"; + }; + }); +} diff --git a/0.csv b/input/0.csv similarity index 100% rename from 0.csv rename to input/0.csv diff --git a/1.csv b/input/1.csv similarity index 100% rename from 1.csv rename to input/1.csv diff --git a/2.csv b/input/2.csv similarity index 100% rename from 2.csv rename to input/2.csv diff --git a/3.csv b/input/3.csv similarity index 100% rename from 3.csv rename to input/3.csv diff --git a/4.csv b/input/4.csv similarity index 100% rename from 4.csv rename to input/4.csv diff --git a/5.csv b/input/5.csv similarity index 100% rename from 5.csv rename to input/5.csv diff --git a/6.csv b/input/6.csv similarity index 100% rename from 6.csv rename to input/6.csv diff --git a/7.csv b/input/7.csv similarity index 100% rename from 7.csv rename to input/7.csv diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..83ff1d1 --- /dev/null +++ b/pom.xml @@ -0,0 +1,73 @@ + + + 4.0.0 + org.ifmo.app + lab3-dmfrpro + jar + 1.0-SNAPSHOT + lab3-dmfrpro + http://maven.apache.org + + + 8 + 8 + 3.4.1 + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${maven.compiler.source} + ${maven.compiler.target} + + + + org.apache.maven.plugins + maven-assembly-plugin + + + + org.ifmo.app.SalesAnalysisDriver + + + + jar-with-dependencies + + + + + package + + single + + + + + + + diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..f2d0bdc --- /dev/null +++ b/run.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +echo "Starting cluster..." +docker-compose up -d +sleep 5 + +CONTAINER=$(docker-compose ps -q namenode) + +echo "Setting up HDFS..." +docker exec $CONTAINER bash -c " + # Clean everything + hdfs dfs -rm -r /sales_input /sales_temp /sales_result 2>/dev/null || true + + # Create and upload to sales_input + hdfs dfs -mkdir -p /sales_input + for f in /opt/hadoop-project/input/*.csv; do + if [ -f \"\$f\" ]; then + hdfs dfs -put \"\$f\" /sales_input/ + fi + done + + echo 'Uploaded files:' + hdfs dfs -ls /sales_input +" + +echo "Compiling..." +mvn clean package + +echo "Running job..." +docker exec $CONTAINER bash -c " + yarn jar /opt/hadoop-project/target/lab3-dmfrpro-1.0-SNAPSHOT-jar-with-dependencies.jar \ + org.ifmo.app.SalesAnalysisDriver \ + /sales_input \ + /sales_temp \ + /sales_result +" + +echo -e "\n=== RESULTS ===" +docker exec $CONTAINER hdfs dfs -cat /sales_result/part-r-00000 diff --git a/src/main/java/org/ifmo/app/SalesAnalysisDriver.java b/src/main/java/org/ifmo/app/SalesAnalysisDriver.java new file mode 100644 index 0000000..ea17586 --- /dev/null +++ b/src/main/java/org/ifmo/app/SalesAnalysisDriver.java @@ -0,0 +1,109 @@ +package org.ifmo.app; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import java.net.URI; + +public class SalesAnalysisDriver { + + public static class DescendingDoubleComparator extends WritableComparator { + protected DescendingDoubleComparator() { + super(DoubleWritable.class, true); + } + + @SuppressWarnings("rawtypes") + @Override + public int compare(WritableComparable w1, WritableComparable w2) { + DoubleWritable key1 = (DoubleWritable) w1; + DoubleWritable key2 = (DoubleWritable) w2; + return -1 * key1.compareTo(key2); + } + } + + public static void main(String[] args) throws Exception { + if (args.length != 4) { + System.err.println("ERROR: Need exactly 3 arguments!"); + System.err.println("Usage: SalesAnalysisDriver "); + System.err.println("Example: SalesAnalysisDriver /input /output/temp /output/result"); + System.exit(-1); + } + + String inputPath = args[1]; + String intermediatePath = args[2]; + String finalPath = args[3]; + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + + // Clean output directories if they exist + System.out.println("Cleaning output directories..."); + fs.delete(new Path(intermediatePath), true); + fs.delete(new Path(finalPath), true); + + System.out.println("\n=== Starting Job 1: Calculate revenue per category ==="); + + Job job1 = Job.getInstance(conf, "Sales Analysis"); + job1.setJarByClass(SalesAnalysisDriver.class); + + job1.setMapperClass(SalesMapper.class); + job1.setReducerClass(SalesReducer.class); + + job1.setMapOutputKeyClass(Text.class); + job1.setMapOutputValueClass(SalesRecordWritable.class); + job1.setOutputKeyClass(Text.class); + job1.setOutputValueClass(SalesRecordWritable.class); + + job1.setInputFormatClass(TextInputFormat.class); + job1.setOutputFormatClass(TextOutputFormat.class); + + FileInputFormat.addInputPath(job1, new Path(inputPath)); + FileOutputFormat.setOutputPath(job1, new Path(intermediatePath)); + + job1.setNumReduceTasks(3); + + boolean success = job1.waitForCompletion(true); + + if (!success) { + System.err.println("Job 1 failed!"); + System.exit(1); + } + + System.out.println("\n=== Starting Job 2: Sort by revenue (descending) ==="); + + Job job2 = Job.getInstance(conf, "Sort by Revenue"); + job2.setJarByClass(SalesAnalysisDriver.class); + + job2.setMapperClass(SortMapper.class); + job2.setReducerClass(SortReducer.class); + + job2.setMapOutputKeyClass(DoubleWritable.class); + job2.setMapOutputValueClass(Text.class); + job2.setOutputKeyClass(Text.class); + job2.setOutputValueClass(Text.class); + + job2.setInputFormatClass(TextInputFormat.class); + job2.setOutputFormatClass(TextOutputFormat.class); + job2.setSortComparatorClass(DescendingDoubleComparator.class); + + FileInputFormat.addInputPath(job2, new Path(intermediatePath)); + FileOutputFormat.setOutputPath(job2, new Path(finalPath)); + + // Single reducer for global sort + job2.setNumReduceTasks(1); + + success = job2.waitForCompletion(true); + + System.out.println("\n=== Job Complete ==="); + System.exit(success ? 0 : 1); + } +} diff --git a/src/main/java/org/ifmo/app/SalesMapper.java b/src/main/java/org/ifmo/app/SalesMapper.java new file mode 100644 index 0000000..5d9eca5 --- /dev/null +++ b/src/main/java/org/ifmo/app/SalesMapper.java @@ -0,0 +1,51 @@ +package org.ifmo.app; + +import java.io.IOException; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +public class SalesMapper extends Mapper { + + private Text category = new Text(); + private SalesRecordWritable record = new SalesRecordWritable(); + + public static enum Counter { + BAD_RECORDS, + HEADER_SKIPPED + } + + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + + String line = value.toString(); + + if (key.get() == 0 && line.startsWith("transaction_id")) { + context.getCounter(Counter.HEADER_SKIPPED).increment(1); + return; + } + + String[] fields = line.split(","); + + if (fields.length >= 5) { + try { + String categoryStr = fields[2].trim(); + double price = Double.parseDouble(fields[3].trim()); + long quantity = Long.parseLong(fields[4].trim()); + double revenue = price * quantity; + + category.set(categoryStr); + record = new SalesRecordWritable(revenue, quantity); + + context.write(category, record); + } catch (NumberFormatException e) { + context.getCounter(Counter.BAD_RECORDS).increment(1); + System.err.println("Invalid row: " + line); + } + } else { + context.getCounter(Counter.BAD_RECORDS).increment(1); + System.err.println("Row does not have enough fields: " + line); + } + } +} diff --git a/src/main/java/org/ifmo/app/SalesRecordWritable.java b/src/main/java/org/ifmo/app/SalesRecordWritable.java new file mode 100644 index 0000000..5b86457 --- /dev/null +++ b/src/main/java/org/ifmo/app/SalesRecordWritable.java @@ -0,0 +1,51 @@ +package org.ifmo.app; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import org.apache.hadoop.io.Writable; + +public class SalesRecordWritable implements Writable { + private double revenue; + private long quantity; + + public SalesRecordWritable() { + this.revenue = 0.0; + this.quantity = 0; + } + + public SalesRecordWritable(double revenue, long quantity) { + this.revenue = revenue; + this.quantity = quantity; + } + + public void add(SalesRecordWritable other) { + this.revenue += other.revenue; + this.quantity += other.quantity; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeDouble(revenue); + out.writeLong(quantity); + } + + @Override + public void readFields(DataInput in) throws IOException { + revenue = in.readDouble(); + quantity = in.readLong(); + } + + public double getRevenue() { + return revenue; + } + + public long getQuantity() { + return quantity; + } + + @Override + public String toString() { + return String.format("%.2f\t%d", revenue, quantity); + } +} diff --git a/src/main/java/org/ifmo/app/SalesReducer.java b/src/main/java/org/ifmo/app/SalesReducer.java new file mode 100644 index 0000000..9413e79 --- /dev/null +++ b/src/main/java/org/ifmo/app/SalesReducer.java @@ -0,0 +1,26 @@ +package org.ifmo.app; + +import java.io.IOException; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +public class SalesReducer extends Reducer { + + private SalesRecordWritable result = new SalesRecordWritable(); + + @Override + protected void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + + double totalRevenue = 0.0; + long totalQuantity = 0; + + for (SalesRecordWritable val : values) { + totalRevenue += val.getRevenue(); + totalQuantity += val.getQuantity(); + } + + result = new SalesRecordWritable(totalRevenue, totalQuantity); + context.write(key, result); + } +} diff --git a/src/main/java/org/ifmo/app/SortMapper.java b/src/main/java/org/ifmo/app/SortMapper.java new file mode 100644 index 0000000..e871183 --- /dev/null +++ b/src/main/java/org/ifmo/app/SortMapper.java @@ -0,0 +1,36 @@ +package org.ifmo.app; + +import java.io.IOException; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +public class SortMapper extends Mapper { + + private DoubleWritable revenueKey = new DoubleWritable(); + private Text outputValue = new Text(); + + @Override + protected void map(LongWritable key, Text value, Context context) + throws IOException, InterruptedException { + + String line = value.toString(); + String[] parts = line.split("\t"); + + if (parts.length >= 3) { + try { + String category = parts[0]; + double revenue = Double.parseDouble(parts[1]); + String quantity = parts[2]; + + revenueKey.set(revenue); + outputValue.set(category + "\t" + quantity); + + context.write(revenueKey, outputValue); + } catch (NumberFormatException e) { + System.err.println("Invalid revenue value: " + line); + } + } + } +} diff --git a/src/main/java/org/ifmo/app/SortReducer.java b/src/main/java/org/ifmo/app/SortReducer.java new file mode 100644 index 0000000..06b9ede --- /dev/null +++ b/src/main/java/org/ifmo/app/SortReducer.java @@ -0,0 +1,27 @@ +package org.ifmo.app; + +import java.io.IOException; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +public class SortReducer extends Reducer { + + @Override + protected void reduce(DoubleWritable key, Iterable values, Context context) + throws IOException, InterruptedException { + + double revenue = key.get(); + + for (Text val : values) { + String[] parts = val.toString().split("\t"); + String category = parts[0]; + String quantity = parts[1]; + + context.write( + new Text(category), + new Text(String.format("%.2f\t%s", revenue, quantity)) + ); + } + } +}