From 95a8cac4c277bd033170fcc221029043b0d02dfa Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 10:38:08 +0000 Subject: [PATCH 1/3] Setting up GitHub Classroom Feedback From 2ef980391b96fc13ad5b8ff0dc1cbf05f102f8db Mon Sep 17 00:00:00 2001 From: "github-classroom[bot]" <66690702+github-classroom[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 10:38:11 +0000 Subject: [PATCH 2/3] add deadline --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 948dc85..84b20f4 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,4 @@ +[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/uyodabcP) ## Лабораторная работа: Реализация MapReduce для анализа данных о продажах с ипользованием HADOOP!!! # Цель работы From 7d36d7cda217a6607b44ce8a84e06299de639a87 Mon Sep 17 00:00:00 2001 From: Vlad Savikin Date: Thu, 18 Dec 2025 13:54:22 +0300 Subject: [PATCH 3/3] V1 --- .github/.keep | 0 .gitignore | 2 + .lfsconfig | 2 + 0.csv | 3 - 1.csv | 3 - 2.csv | 3 - 3.csv | 3 - 4.csv | 3 - 5.csv | 3 - 6.csv | 3 - 7.csv | 3 - README.md | 48 ----------- app/.gitignore | 32 ++++++++ app/.scalafmt.conf | 14 ++++ app/README.md | 8 ++ app/build.sbt | 31 +++++++ app/project/build.properties | 1 + app/project/plugins.sbt | 1 + app/src/main/scala/Main.scala | 149 ++++++++++++++++++++++++++++++++++ config | 27 ++++++ docker-compose.yaml | 38 +++++++++ 21 files changed, 305 insertions(+), 72 deletions(-) delete mode 100644 .github/.keep create mode 100644 .gitignore create mode 100644 .lfsconfig delete mode 100644 0.csv delete mode 100644 1.csv delete mode 100644 2.csv delete mode 100644 3.csv delete mode 100644 4.csv delete mode 100644 5.csv delete mode 100644 6.csv delete mode 100644 7.csv delete mode 100644 README.md create mode 100644 app/.gitignore create mode 100644 app/.scalafmt.conf create mode 100644 app/README.md create mode 100644 app/build.sbt create mode 100644 app/project/build.properties create mode 100644 app/project/plugins.sbt create mode 100644 app/src/main/scala/Main.scala create mode 100644 config create mode 100644 docker-compose.yaml diff --git a/.github/.keep b/.github/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2385b03 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +data/ +results/ diff --git a/.lfsconfig b/.lfsconfig new file mode 100644 index 0000000..1315906 --- /dev/null +++ b/.lfsconfig @@ -0,0 +1,2 @@ +[lfs] + url = https://github.com/AdvancedJavaLabs/lab4-parallel.git/info/lfs diff --git a/0.csv b/0.csv deleted file mode 100644 index 2f7dfb7..0000000 --- a/0.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:eae61feac01cb36de9d6dd148a5b3fb76ad506fd8b283b20306df16f246e511e -size 3406784 diff --git a/1.csv b/1.csv deleted file mode 100644 index 3a258b0..0000000 --- a/1.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:7b2681409003d07d3ca063eb34339a0cdea9b688d16723b0e284091afd6bf806 -size 7078520 diff --git a/2.csv b/2.csv deleted file mode 100644 index 4cdd100..0000000 --- a/2.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:ea9a50aa300dfe7e766eb419b2a963d31cfe68644acb23f654df7abe852d3a76 -size 10737171 diff --git a/3.csv b/3.csv deleted file mode 100644 index afbe78d..0000000 --- a/3.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:6268fae7b50879a151a6f6de4852e4f39d2e22a8315290e87dcda71f4e10b866 -size 14530705 diff --git a/4.csv b/4.csv deleted file mode 100644 index 9ff08df..0000000 --- a/4.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:b4adf0d42097f1b2c9e68731460c5a7c52cb7ac7238addab7a796817cee9d00b -size 18299520 diff --git a/5.csv b/5.csv deleted file mode 100644 index 3980291..0000000 --- a/5.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:212b6d6b07197921eaedaa271d781431b8bb034c5328622d63231a2967df1702 -size 22053240 diff --git a/6.csv b/6.csv deleted file mode 100644 index 5906dc6..0000000 --- a/6.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:81f65085c6ce29a8f766244dc4b21d41d565ea3d6231b3b1c0b6739d67cd1d53 -size 25790880 diff --git a/7.csv b/7.csv deleted file mode 100644 index df43af3..0000000 --- a/7.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:4e73ff71a37438ec216fc522d77e3902c5670e24a917d0be047ed101dbeea914 -size 29524261 diff --git a/README.md b/README.md deleted file mode 100644 index 84b20f4..0000000 --- a/README.md +++ /dev/null @@ -1,48 +0,0 @@ -[![Review Assignment Due Date](https://classroom.github.com/assets/deadline-readme-button-22041afd0340ce965d47ae6ef1cefeee28c7c493a6346c4f15d667ab976d596c.svg)](https://classroom.github.com/a/uyodabcP) -## Лабораторная работа: Реализация MapReduce для анализа данных о продажах с ипользованием HADOOP!!! -# Цель работы - -Ознакомиться с концепцией распределенных вычислений на примере модели MapReduce. Научиться разрабатывать многопоточную систему для обработки больших данных и применять её для анализа данных о продажах. -# Описание задачи - -У вас в репозитории есть несколько CSV-файлов, представляющих данные о продажах, например: - - transaction_id,product_id,category,price,quantity - 1,101,electronics,300.00,2 - 2,102,books,15.00,5 - 3,101,electronics,300.00,1 - 4,103,toys,25.00,4 - 5,102,books,15.00,3 - -Необходимо: - - * Вычислить общую выручку для каждой категории товаров. - * Подсчитать общее количество проданных товаров по категориям. - * Отсортировать категории по общей выручке в порядке убывания. - -Пример вывода: - - Category Revenue Quantity - electronics 900.00 3 - books 120.00 8 - toys 100.00 4 - -# Требования -Основная часть: - - * Используем hadoop - * Написать реализацию MapReduce для обработки CSV-файлов. - * Реализовать многопоточность в каждой фазе: - * Map — обработка строк из файлов. - * Shuffle/Sort — группировка данных по категориям. - * Reduce — вычисление итоговых значений для каждой категории. - * Сохранить результат в файл. - * Обеспечить потокобезопасность при работе с общими данными. - * Реализовать поддержку одновременной обработки большого количества файлов. - -Дополнительные задачи (по желанию): - -* Добавить возможность выбора метрики анализа (например, подсчёт средней цены товара в категории). - -# Результаты -Результатом работы является сам код, файл с результатами и экспериментальные данные по быстродействию работы написанного кода при изменении числа worker-ов / частей, на которые разбивается файл diff --git a/app/.gitignore b/app/.gitignore new file mode 100644 index 0000000..9e79245 --- /dev/null +++ b/app/.gitignore @@ -0,0 +1,32 @@ +# macOS +.DS_Store + +# sbt specific +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ +project/local-plugins.sbt +.history +.ensime +.ensime_cache/ +.sbt-scripted/ +local.sbt + +# Bloop +.bsp + +# VS Code +.vscode/ + +# Metals +.bloop/ +.metals/ +metals.sbt + +# IDEA +.idea +.idea_modules +/.worksheet/ diff --git a/app/.scalafmt.conf b/app/.scalafmt.conf new file mode 100644 index 0000000..2025b30 --- /dev/null +++ b/app/.scalafmt.conf @@ -0,0 +1,14 @@ +version = "3.10.0" +runner.dialect = scala3 +runner.dialectOverride.allowSignificantIndentation = false +maxColumn = 160 +indent.fewerBraces = never +rewrite.rules = [RedundantBraces]. +rewrite.scala3.removeOptionalBraces.enabled = false +rewrite.scala3.removeOptionalBraces = false +rewrite.insertBraces.minLines = 0 +rewrite.insertBraces.allBlocks = true +newlines.source = unfold +danglingParentheses.callSite = true +danglingParentheses.defnSite = true +danglingParentheses.ctrlSite = true diff --git a/app/README.md b/app/README.md new file mode 100644 index 0000000..102c5ca --- /dev/null +++ b/app/README.md @@ -0,0 +1,8 @@ +## sbt project compiled with Scala 3 + +### Usage + +This is a normal sbt project. You can compile code with `sbt compile`, run it with `sbt run`, and `sbt console` will start a Scala 3 REPL. + +For more information on the sbt-dotty plugin, see the +[scala3-example-project](https://github.com/scala/scala3-example-project/blob/main/README.md). diff --git a/app/build.sbt b/app/build.sbt new file mode 100644 index 0000000..2113202 --- /dev/null +++ b/app/build.sbt @@ -0,0 +1,31 @@ +val scala3Version = "3.7.4" +val hadoopVersion = "3.4.1" +//val sparkVersion = "3.5.7" + +lazy val root = project + .in(file(".")) + .settings( + name := "App", + version := "1.0.0", + assembly / mainClass := some("App"), + assembly / assemblyJarName := "app.jar", + ThisBuild / assemblyMergeStrategy := { + case x => MergeStrategy.first + }, + scalaVersion := scala3Version, + scalacOptions ++= Seq("-unchecked", "-deprecation"), + scalacOptions ++= Seq("-java-output-version", "8"), + javacOptions ++= Seq("-source", "1.8", "-target", "1.8"), + + libraryDependencies ++= Seq( + "com.lihaoyi" %% "upickle" % "4.4.1", + "org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-mapreduce-client-app" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-client-api" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-hdfs-client" % hadoopVersion % "provided", + //("org.apache.spark" %% "spark-core" % sparkVersion).cross(CrossVersion.for3Use2_13), + //("org.apache.spark" %% "spark-streaming" % sparkVersion).cross(CrossVersion.for3Use2_13), + ), + ) diff --git a/app/project/build.properties b/app/project/build.properties new file mode 100644 index 0000000..01a16ed --- /dev/null +++ b/app/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.11.7 diff --git a/app/project/plugins.sbt b/app/project/plugins.sbt new file mode 100644 index 0000000..f8ea5d0 --- /dev/null +++ b/app/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.3.1") diff --git a/app/src/main/scala/Main.scala b/app/src/main/scala/Main.scala new file mode 100644 index 0000000..41270bf --- /dev/null +++ b/app/src/main/scala/Main.scala @@ -0,0 +1,149 @@ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.Mapper +import org.apache.hadoop.mapreduce.Reducer +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat + +import org.apache.hadoop.fs.FileSystem +import java.io.IOException + +import scala.jdk.CollectionConverters.* +import scala.annotation.static + +import upickle.default.{ReadWriter => RW, macroRW} +import upickle._ + +case class Transaction(var transaction_id: Long, var product_id: Long, var category: String, var price: Long, var quantity: Long) {} +object Transaction { + implicit val rw: RW[Transaction] = macroRW +} + +case class CategoryData(var category: String, var revenue: Long, var quantity: Long) {} +object CategoryData { + implicit val rw: RW[CategoryData] = macroRW +} + +class Phase1Mapper extends Mapper[Object, Text, Text, Text] { + private def parse_price(str: String): Long = (str.toDouble * 1000).toLong + + override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, Text]#Context): Unit = + if (value != null && !value.toString().contains("transaction_id,product_id,category,price,quantity")) then { + val data: Array[String] = value.toString().split(","); + val transaction_id: Long = data(0).toLong; + val product_id: Long = data(1).toLong; + val category: String = data(2); + val price: Long = parse_price(data(3)) + val quantity: Long = data(4).toLong; + + context.write(Text(category), Text(upickle.write(Transaction(transaction_id, product_id, category, price, quantity)))); + } +} + +class Phase1Combiner extends Reducer[Text, Text, Text, Text] { + override def reduce(key: Text, values: java.lang.Iterable[Text], context: Reducer[Text, Text, Text, Text]#Context): Unit = { + var revenue: Long = 0; + var quantity: Long = 0; + for valuesrc <- values.asScala do { + val value = upickle.read[Transaction](valuesrc.toString()) + revenue = revenue + value.quantity * value.price; + quantity = quantity + value.quantity; + } + + context.write(key, Text(upickle.write(CategoryData(key.toString(), revenue, quantity)))); + } +} + +class Phase1Reducer extends Reducer[Text, Text, LongWritable, Text] { + override def reduce(key: Text, values: java.lang.Iterable[Text], context: Reducer[Text, Text, LongWritable, Text]#Context): Unit = { + var revenue: Long = 0; + var quantity: Long = 0; + for valuesrc <- values.asScala do { + val value = upickle.read[CategoryData](valuesrc.toString()) + revenue = revenue + value.revenue; + quantity = quantity + value.quantity; + } + + context.write(LongWritable(revenue), Text(upickle.write(CategoryData(key.toString(), revenue, quantity)))); + } +} + +class App; +object App { + @static + def main(args: Array[String]): Unit = { + val input_dir = Path("/data"); + val output_dir = Path("/results"); + + val jobconf = Configuration(); + val fs = FileSystem.get(jobconf); + if (fs.exists(output_dir)) then { + fs.delete(output_dir, true); + } + + val job1 = Job.getInstance(jobconf, "Lab3"); + job1.setJarByClass(classOf[App]); + job1.setInputFormatClass(classOf[TextInputFormat]); + job1.setMapperClass(classOf[Phase1Mapper]); + job1.setMapOutputKeyClass(classOf[Text]); + job1.setMapOutputValueClass(classOf[Text]); + job1.setCombinerClass(classOf[Phase1Combiner]); + job1.setReducerClass(classOf[Phase1Reducer]); + job1.setOutputKeyClass(classOf[LongWritable]); + job1.setOutputValueClass(classOf[Text]); + FileInputFormat.addInputPath(job1, input_dir); + FileOutputFormat.setOutputPath(job1, output_dir); + job1.waitForCompletion(true); + } +} + +type Mapper2 = Mapper[Object, Text, LongWritable, Text]; +class Phase2Mapper extends Mapper2 { + override def map(key: Object, value: Text, context: Mapper2#Context): Unit = { + val strs = value.toString().split("\\s").splitAt(1).toList.map(_.mkString) + context.write(LongWritable(strs(0).toLong * -1), Text(strs(1))); + } +} + +type Reducer2 = Reducer[LongWritable, Text, Void, Text] +class Phase2Reducer extends Reducer2 { + override def reduce(key: LongWritable, values: java.lang.Iterable[Text], context: Reducer2#Context): Unit = + for value <- values.asScala do { + val obj = upickle.read[CategoryData](value.toString()); + context.write(null, Text(f"${obj.category}%-20s ${obj.revenue}%-20s ${obj.quantity}%-20s")); + } +} + +class App2; +object App2 { + @static + def main(args: Array[String]): Unit = { + val input_dir = Path("/results"); + val output_dir = Path("/results2"); + + val jobconf = Configuration(); + val fs = FileSystem.get(jobconf); + if (fs.exists(output_dir)) then { + fs.delete(output_dir, true); + } + + val job = Job.getInstance(jobconf, "Lab3Phase2"); + job.setJarByClass(classOf[App]); + job.setInputFormatClass(classOf[TextInputFormat]); + job.setMapperClass(classOf[Phase2Mapper]); + job.setMapOutputKeyClass(classOf[LongWritable]); + job.setMapOutputValueClass(classOf[Text]); + job.setReducerClass(classOf[Phase2Reducer]); + job.setOutputKeyClass(classOf[Void]); + job.setOutputValueClass(classOf[Text]); + FileInputFormat.addInputPath(job, input_dir); + FileOutputFormat.setOutputPath(job, output_dir); + job.waitForCompletion(true); + } +} diff --git a/config b/config new file mode 100644 index 0000000..8909f35 --- /dev/null +++ b/config @@ -0,0 +1,27 @@ +HADOOP_HOME=/opt/hadoop +CORE-SITE.XML_fs.default.name=hdfs://namenode +CORE-SITE.XML_fs.defaultFS=hdfs://namenode +HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020 +HDFS-SITE.XML_dfs.replication=1 +MAPRED-SITE.XML_mapreduce.framework.name=yarn +MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME +MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME +MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME +YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager +YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false +YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600 +YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false +YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=* +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=* +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings= +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..07b8f72 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,38 @@ +services: + namenode: + container_name: namenode + image: apache/hadoop:3.4.1 + hostname: namenode + command: ["hdfs", "namenode"] + ports: + - 9870:9870 + env_file: + - ./config + environment: + ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name" + datanode: + container_name: datanode + image: apache/hadoop:3.4.1 + command: ["hdfs", "datanode"] + volumes: + - ./app/target/scala-3.7.4/:/app/ + - ./data/:/data/ + - ./results/:/results/ + env_file: + - ./config + resourcemanager: + container_name: resourcemanager + image: apache/hadoop:3.4.1 + hostname: resourcemanager + command: ["yarn", "resourcemanager"] + ports: + - 8088:8088 + env_file: + - ./config + nodemanager: + container_name: nodemanager + image: apache/hadoop:3.4.1 + command: ["yarn", "nodemanager"] + env_file: + - ./config +