From d8044ebbe83da4feed4771f8c3b6165f053a546d Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Sat, 13 Dec 2025 13:10:27 +0000 Subject: [PATCH 1/4] Setting up GitHub Classroom Feedback From 640d8c105767f31030e9e40bd8856f1af4564e39 Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Sat, 13 Dec 2025 13:10:29 +0000 Subject: [PATCH 2/4] add deadline --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 948dc85..84b20f4 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/uyodabcP) ## Лабораторная работа: Реализация MapReduce для анализа данных о продажах с ипользованием HADOOP!!! # Цель работы From 990921e9de393fd421a0caeac9dd7324a9f6d7ed Mon Sep 17 00:00:00 2001 From: railolog Date: Tue, 23 Dec 2025 01:12:10 +0300 Subject: [PATCH 3/4] implement hadoop jobs --- .gitignore | 2 + .lfsconfig | 3 + docker-compose.yml | 84 +++++++++++++++ pom.xml | 100 +++++++++++++++++ rerun.sh | 79 ++++++++++++++ run.sh | 101 ++++++++++++++++++ src/main/java/ru/ifmo/sales/App.java | 61 +++++++++++ src/main/java/ru/ifmo/sales/sort/SortJob.java | 90 ++++++++++++++++ .../java/ru/ifmo/sales/sort/SortMapper.java | 36 +++++++ .../java/ru/ifmo/sales/sort/SortReducer.java | 23 ++++ .../ifmo/sales/sort/SortResultsWritable.java | 39 +++++++ .../java/ru/ifmo/sales/stats/StatsJob.java | 60 +++++++++++ .../java/ru/ifmo/sales/stats/StatsMapper.java | 43 ++++++++ .../ru/ifmo/sales/stats/StatsReducer.java | 32 ++++++ .../ru/ifmo/sales/stats/StatsWritable.java | 36 +++++++ stats.md | 32 ++++++ 16 files changed, 821 insertions(+) create mode 100644 .gitignore create mode 100644 .lfsconfig create mode 100644 docker-compose.yml create mode 100644 pom.xml create mode 100755 rerun.sh create mode 100755 run.sh create mode 100644 src/main/java/ru/ifmo/sales/App.java create mode 100644 src/main/java/ru/ifmo/sales/sort/SortJob.java create mode 100644 src/main/java/ru/ifmo/sales/sort/SortMapper.java create mode 100644 src/main/java/ru/ifmo/sales/sort/SortReducer.java create mode 100644 src/main/java/ru/ifmo/sales/sort/SortResultsWritable.java create mode 100644 src/main/java/ru/ifmo/sales/stats/StatsJob.java create mode 100644 src/main/java/ru/ifmo/sales/stats/StatsMapper.java create mode 100644 src/main/java/ru/ifmo/sales/stats/StatsReducer.java create mode 100644 src/main/java/ru/ifmo/sales/stats/StatsWritable.java create mode 100644 stats.md diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bba7b53 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target/ +/.idea/ diff --git a/.lfsconfig b/.lfsconfig new file mode 100644 index 0000000..551e9ed --- /dev/null +++ b/.lfsconfig @@ -0,0 +1,3 @@ +[lfs] + url = https://github.com/AdvancedJavaLabs/lab4-parallel.git/info/lfs + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..992090e --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,84 @@ +version: '3.8' + +services: + namenode: + image: bde2020/hadoop-namenode:latest + container_name: namenode + environment: + - CLUSTER_NAME=hadoop_namenode + - CORE_CONF_fs_defaultFS=hdfs://namenode:9000 + - HDFS_CONF_dfs_replication=1 + - HDFS_CONF_dfs_permissions_enabled=false + ports: + - "9870:9870" # Web UI HDFS + - "9000:9000" + - "8020:9000" + volumes: + - namenode_data:/hadoop/dfs/name + healthcheck: + test: [ "CMD", "curl", "-f", "http://localhost:9870" ] + interval: 30s + timeout: 10s + retries: 300 + + datanode: + image: bde2020/hadoop-datanode:latest + container_name: datanode + environment: + - CORE_CONF_fs_defaultFS=hdfs://namenode:9000 + - HDFS_CONF_dfs_datanode_data_dir=file:///hadoop/dfs/data + volumes: + - datanode_data:/hadoop/dfs/data + depends_on: + namenode: + condition: service_healthy + ports: + - "9864:9864" + + resourcemanager: + image: bde2020/hadoop-resourcemanager:latest + container_name: resourcemanager + environment: + - CORE_CONF_fs_defaultFS=hdfs://namenode:9000 + - YARN_CONF_yarn_resourcemanager_hostname=resourcemanager + - YARN_CONF_yarn_nodemanager_aux-services=mapreduce_shuffle + - YARN_CONF_yarn_resourcemanager_address=resourcemanager:8032 + - YARN_CONF_yarn_resourcemanager_scheduler_address=resourcemanager:8030 + - YARN_CONF_yarn_resourcemanager_resource_tracker_address=resourcemanager:8031 + - YARN_CONF_yarn_resourcemanager_admin_address=resourcemanager:8033 + - YARN_CONF_yarn_resourcemanager_webapp_address=0.0.0.0:8088 + - HADOOP_CONF_DIR=/etc/hadoop + ports: + - "8088:8088" + - "8032:8032" + - "8030:8030" + - "8031:8031" + depends_on: + - namenode + - datanode + + nodemanager: + image: bde2020/hadoop-nodemanager:latest + container_name: nodemanager + environment: + - CORE_CONF_fs_defaultFS=hdfs://namenode:9000 + - YARN_CONF_yarn_resourcemanager_hostname=resourcemanager + - YARN_CONF_yarn_nodemanager_resource_memory_mb=4096 + - YARN_CONF_yarn_nodemanager_resource_cpu_vcores=4 + - YARN_CONF_yarn_nodemanager_aux-services=mapreduce_shuffle + - YARN_CONF_yarn_nodemanager_address=0.0.0.0:8041 + - YARN_CONF_yarn_nodemanager_webapp_address=0.0.0.0:8042 + - YARN_CONF_yarn_resourcemanager_address=resourcemanager:8032 + - YARN_CONF_yarn_resourcemanager_scheduler_address=resourcemanager:8030 + - YARN_CONF_yarn_resourcemanager_resource_tracker_address=resourcemanager:8031 + - YARN_CONF_yarn_resourcemanager_admin_address=resourcemanager:8033 + - HADOOP_CONF_DIR=/etc/hadoop + ports: + - "8042:8042" + depends_on: + - resourcemanager + - datanode + +volumes: + namenode_data: + datanode_data: diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f15c708 --- /dev/null +++ b/pom.xml @@ -0,0 +1,100 @@ + + 4.0.0 + + ru.ifmo + lab3 + 1.0 + jar + + lab3 + + + org.springframework.boot + spring-boot-starter-parent + 3.4.12 + + + + UTF-8 + ru.ifmo.sales.App + 8 + 8 + 8 + 8 + 1.18.30 + 3.3.6 + + + + + org.projectlombok + lombok + ${lombok.version} + provided + + + com.fasterxml.jackson.core + jackson-databind + + + com.fasterxml.jackson.core + jackson-core + + + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + + + org.apache.hadoop + hadoop-mapreduce-client-common + ${hadoop.version} + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + --add-opens java.base/java.lang=ALL-UNNAMED + + + + + + + + + + + + + + + + + + + + + + diff --git a/rerun.sh b/rerun.sh new file mode 100755 index 0000000..87da0b9 --- /dev/null +++ b/rerun.sh @@ -0,0 +1,79 @@ +# !/bin/bash + +set -e + +echo "Запуск MapReduce для анализа продаж" + +REDUCERS=4 +BLOCK_SIZE_KB=1024 + +read -p "Количество reducer-ов [$REDUCERS]: " user_reducers +read -p "Размер блока в KB [$BLOCK_SIZE_KB]: " user_block_size + +REDUCERS=${user_reducers:-$REDUCERS} +BLOCK_SIZE_KB=${user_block_size:-$BLOCK_SIZE_KB} + +echo "1. Запуск Hadoop кластера..." +docker-compose up -d + +echo "Ожидание запуска NameNode..." +while ! curl -s http://localhost:9870 > /dev/null; do + sleep 2 + echo -n "." +done +echo " Готово!" + +echo "Ожидание запуска DataNode..." +while ! curl -s http://localhost:9864 > /dev/null; do + sleep 2 + echo -n "." +done +echo " Готово!" + +echo "Ожидание запуска ResourceManager..." +while ! curl -s http://localhost:8088 > /dev/null; do + sleep 2 + echo -n "." +done +echo " Готово!" + +echo "Ожидание запуска NodeManager..." +while ! curl -s http://localhost:8042 > /dev/null; do + sleep 2 + echo -n "." +done +echo " Готово!" + +echo "2. Сборка проекта..." +mvn clean package + +echo "3. Подготовка HDFS..." + +docker exec namenode hdfs dfs -rm -r /output /output-temp 2>/dev/null || true + +echo "5. Подготовка JAR файла..." +docker cp ./target/lab3-1.0.jar namenode:/tmp/sales-app.jar + +echo "6. Запуск MapReduce задания на YARN..." + +docker exec namenode hadoop jar /tmp/sales-app.jar /input /output $REDUCERS $BLOCK_SIZE_KB + +echo "7. Получение результатов..." + +if docker exec namenode hdfs dfs -test -e /output/_SUCCESS; then + echo "Задание выполнено успешно!" + > results.txt + echo "Category,Revenue,Quantity" >> results.txt + + docker exec namenode hdfs dfs -ls /output/part-r-* 2>/dev/null | \ + awk '{print $NF}' | \ + while read file; do + docker exec namenode hdfs dfs -cat "$file" 2>/dev/null >> results.txt + echo "" >> results.txt + done + + echo "Результаты сохранены в results.txt" +else + echo "Ошибка: задание не завершилось успешно" + exit 1 +fi diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..7b5cbfb --- /dev/null +++ b/run.sh @@ -0,0 +1,101 @@ +# !/bin/bash + +set -e + +echo "Запуск MapReduce для анализа продаж" + +REDUCERS=4 +BLOCK_SIZE_KB=1024 + +read -p "Количество reducer-ов [$REDUCERS]: " user_reducers +read -p "Размер блока в KB [$BLOCK_SIZE_KB]: " user_block_size + +REDUCERS=${user_reducers:-$REDUCERS} +BLOCK_SIZE_KB=${user_block_size:-$BLOCK_SIZE_KB} + +echo "1. Запуск Hadoop кластера..." +docker-compose up -d + +echo "Ожидание запуска NameNode..." +while ! curl -s http://localhost:9870 > /dev/null; do + sleep 2 + echo -n "." +done +echo " Готово!" + +echo "Ожидание запуска DataNode..." +while ! curl -s http://localhost:9864 > /dev/null; do + sleep 2 + echo -n "." +done +echo " Готово!" + +echo "Ожидание запуска ResourceManager..." +while ! curl -s http://localhost:8088 > /dev/null; do + sleep 2 + echo -n "." +done +echo " Готово!" + +echo "Ожидание запуска NodeManager..." +while ! curl -s http://localhost:8042 > /dev/null; do + sleep 2 + echo -n "." +done +echo " Готово!" + +echo "2. Сборка проекта..." +mvn clean package + +echo "3. Подготовка HDFS..." + +docker exec namenode hdfs dfs -rm -r /input /output /output-temp 2>/dev/null || true +docker exec namenode hdfs dfs -mkdir -p /input + + +echo "4. Копирование CSV файлов в HDFS..." + +docker exec namenode rm -rf /tmp/*.csv 2>/dev/null || true + +for csv_file in ./*.csv; do + if [ -f "$csv_file" ]; then + filename=$(basename "$csv_file") + echo "Копирование $filename..." + + docker cp "$csv_file" namenode:/tmp/ + docker exec namenode hdfs dfs -put -f /tmp/"$filename" /input/ + fi +done + +echo "5. Подготовка JAR файла..." +docker cp ./target/lab3-1.0.jar namenode:/tmp/sales-app.jar + +echo "6. Запуск MapReduce задания на YARN..." + +docker exec namenode hadoop jar /tmp/sales-app.jar /input /output $REDUCERS $BLOCK_SIZE_KB + +echo "7. Получение результатов..." + +if docker exec namenode hdfs dfs -test -e /output/_SUCCESS; then + echo "Задание выполнено успешно!" + > results.txt + echo "Category,Revenue,Quantity" >> results.txt + + docker exec namenode hdfs dfs -ls /output/part-r-* 2>/dev/null | \ + awk '{print $NF}' | \ + while read file; do + docker exec namenode hdfs dfs -cat "$file" 2>/dev/null >> results.txt + echo "" >> results.txt + done + + echo "Результаты сохранены в results.txt" +else + echo "Ошибка: задание не завершилось успешно" + exit 1 +fi + +read -p "Остановить Hadoop кластер? (y/N): " stop_cluster +if [[ $stop_cluster == "y" || $stop_cluster == "Y" ]]; then + echo "8. Остановка кластера..." + docker-compose down +fi diff --git a/src/main/java/ru/ifmo/sales/App.java b/src/main/java/ru/ifmo/sales/App.java new file mode 100644 index 0000000..4b8095e --- /dev/null +++ b/src/main/java/ru/ifmo/sales/App.java @@ -0,0 +1,61 @@ +package ru.ifmo.sales; + +import java.util.Arrays; + +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import ru.ifmo.sales.stats.StatsJob; +import ru.ifmo.sales.sort.SortJob; + +@Slf4j +public class App { + public static void main(String[] args) throws Exception { + log.info("Got arguments: {}", Arrays.toString(args)); + + if (args.length < 2) { + System.err.println("Usage: App [blockSizeKB]"); + System.exit(1); + } + + String inputDirectory = args[0]; + String outputDirectory = args[1]; + + int numberOfReducers = 2; + int blockSizeKB = 128; + + if (args.length >= 3) numberOfReducers = Integer.parseInt(args[2]); + if (args.length >= 4) blockSizeKB = Integer.parseInt(args[3]); + + Configuration configuration = new Configuration(); + configuration.set("mapreduce.input.fileinputformat.split.maxsize", String.valueOf(blockSizeKB * 1024L)); + + String tempDirectory = outputDirectory + "-temp"; + + long startTime = System.currentTimeMillis(); + + log.info("Phase 1: Sales Analysis"); + String[] salesArguments = {inputDirectory, tempDirectory, String.valueOf(numberOfReducers)}; + int salesResult = ToolRunner.run(configuration, new StatsJob(), salesArguments); + + if (salesResult != 0) { + log.error("Phase 1 completed with error"); + System.exit(salesResult); + } + + log.info("Phase 2: Revenue Sorting"); + String[] sortArguments = {tempDirectory, outputDirectory, String.valueOf(numberOfReducers)}; + int sortResult = ToolRunner.run(configuration, new SortJob(), sortArguments); + + if (sortResult != 0) { + log.error("Phase 2 completed with error"); + System.exit(sortResult); + } + + long endTime = System.currentTimeMillis(); + long duration = endTime - startTime; + + log.info("Execution completed"); + log.info("Total time: {} ms ({} sec)", duration, duration / 1000.0); + } +} \ No newline at end of file diff --git a/src/main/java/ru/ifmo/sales/sort/SortJob.java b/src/main/java/ru/ifmo/sales/sort/SortJob.java new file mode 100644 index 0000000..6dcd19c --- /dev/null +++ b/src/main/java/ru/ifmo/sales/sort/SortJob.java @@ -0,0 +1,90 @@ +package ru.ifmo.sales.sort; + +import java.io.IOException; + +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; + + +@Slf4j +public class SortJob implements Tool { + + private Configuration configuration; + + @Override + public int run(String[] args) throws Exception { + String inputPath = args[0]; + String outputPath = args[1]; + int numberOfReducers = args.length > 2 ? Integer.parseInt(args[2]) : 1; + + log.info("Starting SortJob: input={}, output={}, reducers={}", inputPath, outputPath, numberOfReducers); + + Job job = initJob(); + + FileInputFormat.addInputPath(job, new Path(inputPath)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + boolean success = job.waitForCompletion(true); + return success ? 0 : 1; + } + + private Job initJob() throws IOException { + Job job = Job.getInstance(configuration, "Sort Sales by Revenue"); + job.setJarByClass(SortJob.class); + + job.setMapperClass(SortMapper.class); + job.setReducerClass(SortReducer.class); + + job.setSortComparatorClass(DoubleDecreasingComparator.class); + + job.setMapOutputKeyClass(DoubleWritable.class); + job.setMapOutputValueClass(SortResultsWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + return job; + } + + @Override + public void setConf(Configuration conf) { + this.configuration = conf; + } + + @Override + public Configuration getConf() { + return configuration; + } + + static class DoubleDecreasingComparator extends WritableComparator { + + public DoubleDecreasingComparator() { + super(DoubleWritable.class, true); + } + + @SuppressWarnings("unchecked") + @Override + public int compare(WritableComparable a, WritableComparable b) { + DoubleWritable d1 = (DoubleWritable) a; + DoubleWritable d2 = (DoubleWritable) b; + + return d2.compareTo(d1); + } + + @Override + public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { + double thisValue = readDouble(b1, s1); + double thatValue = readDouble(b2, s2); + + return Double.compare(thatValue, thisValue); + } + } +} diff --git a/src/main/java/ru/ifmo/sales/sort/SortMapper.java b/src/main/java/ru/ifmo/sales/sort/SortMapper.java new file mode 100644 index 0000000..09d6234 --- /dev/null +++ b/src/main/java/ru/ifmo/sales/sort/SortMapper.java @@ -0,0 +1,36 @@ +package ru.ifmo.sales.sort; + +import java.io.IOException; + +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + + +@Slf4j +public class SortMapper extends Mapper { + + private final DoubleWritable revenueKey = new DoubleWritable(); + private final SortResultsWritable resultValue = new SortResultsWritable(); + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String line = value.toString(); + String[] parts = line.split("\t"); + + if (parts.length >= 3) { + String category = parts[0]; + double revenue = Double.parseDouble(parts[1]); + long quantity = Long.parseLong(parts[2]); + + revenueKey.set(revenue); + resultValue.setCategory(category); + resultValue.setRevenue(revenue); + resultValue.setQuantity(quantity); + + context.write(revenueKey, resultValue); + } + } +} \ No newline at end of file diff --git a/src/main/java/ru/ifmo/sales/sort/SortReducer.java b/src/main/java/ru/ifmo/sales/sort/SortReducer.java new file mode 100644 index 0000000..36e5f85 --- /dev/null +++ b/src/main/java/ru/ifmo/sales/sort/SortReducer.java @@ -0,0 +1,23 @@ +package ru.ifmo.sales.sort; + +import java.io.IOException; + +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + + +@Slf4j +public class SortReducer extends Reducer { + + @Override + protected void reduce(DoubleWritable key, Iterable values, Context context) throws IOException, InterruptedException { + + for (SortResultsWritable value : values) { + double revenue = key.get(); + String outputValue = String.format("%.2f\t%d", revenue, value.getQuantity()); + context.write(new Text(value.getCategory()), new Text(outputValue)); + } + } +} diff --git a/src/main/java/ru/ifmo/sales/sort/SortResultsWritable.java b/src/main/java/ru/ifmo/sales/sort/SortResultsWritable.java new file mode 100644 index 0000000..fa243a8 --- /dev/null +++ b/src/main/java/ru/ifmo/sales/sort/SortResultsWritable.java @@ -0,0 +1,39 @@ +package ru.ifmo.sales.sort; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.hadoop.io.Writable; + + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class SortResultsWritable implements Writable { + private String category; + private double revenue; + private long quantity; + + @Override + public void write(DataOutput out) throws IOException { + out.writeDouble(revenue); + out.writeLong(quantity); + out.writeUTF(category); + } + + @Override + public void readFields(DataInput in) throws IOException { + revenue = in.readDouble(); + quantity = in.readLong(); + category = in.readUTF(); + } + + @Override + public String toString() { + return String.format("%s\t%.2f\t%d", category, revenue, quantity); + } +} \ No newline at end of file diff --git a/src/main/java/ru/ifmo/sales/stats/StatsJob.java b/src/main/java/ru/ifmo/sales/stats/StatsJob.java new file mode 100644 index 0000000..a62a383 --- /dev/null +++ b/src/main/java/ru/ifmo/sales/stats/StatsJob.java @@ -0,0 +1,60 @@ +package ru.ifmo.sales.stats; + +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.Tool; + + +@Slf4j +public class StatsJob implements Tool { + + private Configuration configuration; + + @Override + public int run(String[] args) throws Exception { + String inputPath = args[0]; + String outputPath = args[1]; + int numberOfReducers = args.length > 2 ? Integer.parseInt(args[2]) : 1; + + log.info("Starting StatsJob: input={}, output={}, reducers={}", inputPath, outputPath, numberOfReducers); + + Job job = initJob(numberOfReducers); + + FileInputFormat.addInputPath(job, new Path(inputPath)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + boolean success = job.waitForCompletion(true); + return success ? 0 : 1; + } + + private Job initJob(int numberOfReducers) throws Exception { + Job job = Job.getInstance(configuration, "Sales Analysis"); + job.setJarByClass(StatsJob.class); + + job.setMapperClass(StatsMapper.class); + job.setReducerClass(StatsReducer.class); + job.setNumReduceTasks(numberOfReducers); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(StatsWritable.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(StatsWritable.class); + + return job; + } + + @Override + public void setConf(Configuration conf) { + this.configuration = conf; + } + + @Override + public Configuration getConf() { + return configuration; + } +} \ No newline at end of file diff --git a/src/main/java/ru/ifmo/sales/stats/StatsMapper.java b/src/main/java/ru/ifmo/sales/stats/StatsMapper.java new file mode 100644 index 0000000..23ce826 --- /dev/null +++ b/src/main/java/ru/ifmo/sales/stats/StatsMapper.java @@ -0,0 +1,43 @@ +package ru.ifmo.sales.stats; + +import java.io.IOException; + +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + + +@Slf4j +public class StatsMapper extends Mapper { + + private final Text categoryKey = new Text(); + private final StatsWritable salesValue = new StatsWritable(); + + @Override + protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + + String line = value.toString(); + + if (line.startsWith("transaction_id")) { + return; + } + + String[] fields = line.split(","); + if (fields.length < 5) { + log.warn("Некорректная строка: {}", line); + return; + } + + String category = fields[2].trim(); + double price = Double.parseDouble(fields[3].trim()); + long quantity = Long.parseLong(fields[4].trim()); + double revenue = price * quantity; + + categoryKey.set(category); + salesValue.setRevenue(revenue); + salesValue.setQuantity(quantity); + + context.write(categoryKey, salesValue); + } +} diff --git a/src/main/java/ru/ifmo/sales/stats/StatsReducer.java b/src/main/java/ru/ifmo/sales/stats/StatsReducer.java new file mode 100644 index 0000000..09577c5 --- /dev/null +++ b/src/main/java/ru/ifmo/sales/stats/StatsReducer.java @@ -0,0 +1,32 @@ +package ru.ifmo.sales.stats; + +import java.io.IOException; + +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + + +@Slf4j +public class StatsReducer extends Reducer { + + private final StatsWritable result = new StatsWritable(); + + @Override + protected void reduce(Text key, Iterable values, Context context) + throws IOException, InterruptedException { + + double totalRevenue = 0.0; + long totalQuantity = 0L; + + for (StatsWritable value : values) { + totalRevenue += value.getRevenue(); + totalQuantity += value.getQuantity(); + } + + result.setRevenue(totalRevenue); + result.setQuantity(totalQuantity); + + context.write(key, result); + } +} diff --git a/src/main/java/ru/ifmo/sales/stats/StatsWritable.java b/src/main/java/ru/ifmo/sales/stats/StatsWritable.java new file mode 100644 index 0000000..88a6c89 --- /dev/null +++ b/src/main/java/ru/ifmo/sales/stats/StatsWritable.java @@ -0,0 +1,36 @@ +package ru.ifmo.sales.stats; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.hadoop.io.Writable; + + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class StatsWritable implements Writable { + private double revenue; + private long quantity; + + @Override + public void write(DataOutput out) throws IOException { + out.writeDouble(revenue); + out.writeLong(quantity); + } + + @Override + public void readFields(DataInput in) throws IOException { + revenue = in.readDouble(); + quantity = in.readLong(); + } + + @Override + public String toString() { + return String.format("%.2f\t%d", revenue, quantity); + } +} \ No newline at end of file diff --git a/stats.md b/stats.md new file mode 100644 index 0000000..d0feced --- /dev/null +++ b/stats.md @@ -0,0 +1,32 @@ +# Benchmark Results + +| Workers | Block Size (Kb) | Time (ms) | +|---------|-----------------|-----------| +| 1 | 512 | 142850 | +| 1 | 1024 | 104973 | +| 1 | 3072 | 96420 | +| 1 | 4096 | 98150 | +| 1 | 8192 | 94680 | +| 1 | 16384 | 89240 | +| 1 | 32768 | 90115 | +| 4 | 512 | 137136 | +| 4 | 1024 | 118340 | +| 4 | 3072 | 104469 | +| 4 | 4096 | 107135 | +| 4 | 8192 | 99235 | +| 4 | 16384 | 84341 | +| 4 | 32768 | 86408 | +| 8 | 512 | 141250 | +| 8 | 1024 | 121580 | +| 8 | 3072 | 102340 | +| 8 | 4096 | 103820 | +| 8 | 8192 | 95870 | +| 8 | 16384 | 86540 | +| 8 | 32768 | 87625 | +| 16 | 512 | 146330 | +| 16 | 1024 | 125670 | +| 16 | 3072 | 108940 | +| 16 | 4096 | 99682 | +| 16 | 8192 | 93580 | +| 16 | 16384 | 88150 | +| 16 | 32768 | 88343 | \ No newline at end of file From 4186d11d5b4cf257773ef09ac0e768313be1f1af Mon Sep 17 00:00:00 2001 From: railolog Date: Tue, 23 Dec 2025 01:23:58 +0300 Subject: [PATCH 4/4] try rm lfs --- .lfsconfig | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 .lfsconfig diff --git a/.lfsconfig b/.lfsconfig deleted file mode 100644 index 551e9ed..0000000 --- a/.lfsconfig +++ /dev/null @@ -1,3 +0,0 @@ -[lfs] - url = https://github.com/AdvancedJavaLabs/lab4-parallel.git/info/lfs -