diff --git a/.github/.keep b/.github/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2385b03 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +data/ +results/ diff --git a/.lfsconfig b/.lfsconfig new file mode 100644 index 0000000..1315906 --- /dev/null +++ b/.lfsconfig @@ -0,0 +1,2 @@ +[lfs] + url = https://github.com/AdvancedJavaLabs/lab4-parallel.git/info/lfs diff --git a/0.csv b/0.csv deleted file mode 100644 index 2f7dfb7..0000000 --- a/0.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:eae61feac01cb36de9d6dd148a5b3fb76ad506fd8b283b20306df16f246e511e -size 3406784 diff --git a/1.csv b/1.csv deleted file mode 100644 index 3a258b0..0000000 --- a/1.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:7b2681409003d07d3ca063eb34339a0cdea9b688d16723b0e284091afd6bf806 -size 7078520 diff --git a/2.csv b/2.csv deleted file mode 100644 index 4cdd100..0000000 --- a/2.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:ea9a50aa300dfe7e766eb419b2a963d31cfe68644acb23f654df7abe852d3a76 -size 10737171 diff --git a/3.csv b/3.csv deleted file mode 100644 index afbe78d..0000000 --- a/3.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:6268fae7b50879a151a6f6de4852e4f39d2e22a8315290e87dcda71f4e10b866 -size 14530705 diff --git a/4.csv b/4.csv deleted file mode 100644 index 9ff08df..0000000 --- a/4.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:b4adf0d42097f1b2c9e68731460c5a7c52cb7ac7238addab7a796817cee9d00b -size 18299520 diff --git a/5.csv b/5.csv deleted file mode 100644 index 3980291..0000000 --- a/5.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:212b6d6b07197921eaedaa271d781431b8bb034c5328622d63231a2967df1702 -size 22053240 diff --git a/6.csv b/6.csv deleted file mode 100644 index 5906dc6..0000000 --- a/6.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:81f65085c6ce29a8f766244dc4b21d41d565ea3d6231b3b1c0b6739d67cd1d53 -size 25790880 diff --git a/7.csv b/7.csv deleted file mode 100644 index df43af3..0000000 --- a/7.csv +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:4e73ff71a37438ec216fc522d77e3902c5670e24a917d0be047ed101dbeea914 -size 29524261 diff --git a/README.md b/README.md deleted file mode 100644 index 948dc85..0000000 --- a/README.md +++ /dev/null @@ -1,47 +0,0 @@ -## Лабораторная работа: Реализация MapReduce для анализа данных о продажах с ипользованием HADOOP!!! -# Цель работы - -Ознакомиться с концепцией распределенных вычислений на примере модели MapReduce. Научиться разрабатывать многопоточную систему для обработки больших данных и применять её для анализа данных о продажах. -# Описание задачи - -У вас в репозитории есть несколько CSV-файлов, представляющих данные о продажах, например: - - transaction_id,product_id,category,price,quantity - 1,101,electronics,300.00,2 - 2,102,books,15.00,5 - 3,101,electronics,300.00,1 - 4,103,toys,25.00,4 - 5,102,books,15.00,3 - -Необходимо: - - * Вычислить общую выручку для каждой категории товаров. - * Подсчитать общее количество проданных товаров по категориям. - * Отсортировать категории по общей выручке в порядке убывания. - -Пример вывода: - - Category Revenue Quantity - electronics 900.00 3 - books 120.00 8 - toys 100.00 4 - -# Требования -Основная часть: - - * Используем hadoop - * Написать реализацию MapReduce для обработки CSV-файлов. - * Реализовать многопоточность в каждой фазе: - * Map — обработка строк из файлов. - * Shuffle/Sort — группировка данных по категориям. - * Reduce — вычисление итоговых значений для каждой категории. - * Сохранить результат в файл. - * Обеспечить потокобезопасность при работе с общими данными. - * Реализовать поддержку одновременной обработки большого количества файлов. - -Дополнительные задачи (по желанию): - -* Добавить возможность выбора метрики анализа (например, подсчёт средней цены товара в категории). - -# Результаты -Результатом работы является сам код, файл с результатами и экспериментальные данные по быстродействию работы написанного кода при изменении числа worker-ов / частей, на которые разбивается файл diff --git a/app/.gitignore b/app/.gitignore new file mode 100644 index 0000000..9e79245 --- /dev/null +++ b/app/.gitignore @@ -0,0 +1,32 @@ +# macOS +.DS_Store + +# sbt specific +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ +project/local-plugins.sbt +.history +.ensime +.ensime_cache/ +.sbt-scripted/ +local.sbt + +# Bloop +.bsp + +# VS Code +.vscode/ + +# Metals +.bloop/ +.metals/ +metals.sbt + +# IDEA +.idea +.idea_modules +/.worksheet/ diff --git a/app/.scalafmt.conf b/app/.scalafmt.conf new file mode 100644 index 0000000..2025b30 --- /dev/null +++ b/app/.scalafmt.conf @@ -0,0 +1,14 @@ +version = "3.10.0" +runner.dialect = scala3 +runner.dialectOverride.allowSignificantIndentation = false +maxColumn = 160 +indent.fewerBraces = never +rewrite.rules = [RedundantBraces]. +rewrite.scala3.removeOptionalBraces.enabled = false +rewrite.scala3.removeOptionalBraces = false +rewrite.insertBraces.minLines = 0 +rewrite.insertBraces.allBlocks = true +newlines.source = unfold +danglingParentheses.callSite = true +danglingParentheses.defnSite = true +danglingParentheses.ctrlSite = true diff --git a/app/README.md b/app/README.md new file mode 100644 index 0000000..102c5ca --- /dev/null +++ b/app/README.md @@ -0,0 +1,8 @@ +## sbt project compiled with Scala 3 + +### Usage + +This is a normal sbt project. You can compile code with `sbt compile`, run it with `sbt run`, and `sbt console` will start a Scala 3 REPL. + +For more information on the sbt-dotty plugin, see the +[scala3-example-project](https://github.com/scala/scala3-example-project/blob/main/README.md). diff --git a/app/build.sbt b/app/build.sbt new file mode 100644 index 0000000..2113202 --- /dev/null +++ b/app/build.sbt @@ -0,0 +1,31 @@ +val scala3Version = "3.7.4" +val hadoopVersion = "3.4.1" +//val sparkVersion = "3.5.7" + +lazy val root = project + .in(file(".")) + .settings( + name := "App", + version := "1.0.0", + assembly / mainClass := some("App"), + assembly / assemblyJarName := "app.jar", + ThisBuild / assemblyMergeStrategy := { + case x => MergeStrategy.first + }, + scalaVersion := scala3Version, + scalacOptions ++= Seq("-unchecked", "-deprecation"), + scalacOptions ++= Seq("-java-output-version", "8"), + javacOptions ++= Seq("-source", "1.8", "-target", "1.8"), + + libraryDependencies ++= Seq( + "com.lihaoyi" %% "upickle" % "4.4.1", + "org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-mapreduce-client-app" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-client-api" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion % "provided", + "org.apache.hadoop" % "hadoop-hdfs-client" % hadoopVersion % "provided", + //("org.apache.spark" %% "spark-core" % sparkVersion).cross(CrossVersion.for3Use2_13), + //("org.apache.spark" %% "spark-streaming" % sparkVersion).cross(CrossVersion.for3Use2_13), + ), + ) diff --git a/app/project/build.properties b/app/project/build.properties new file mode 100644 index 0000000..01a16ed --- /dev/null +++ b/app/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.11.7 diff --git a/app/project/plugins.sbt b/app/project/plugins.sbt new file mode 100644 index 0000000..f8ea5d0 --- /dev/null +++ b/app/project/plugins.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.3.1") diff --git a/app/src/main/scala/Main.scala b/app/src/main/scala/Main.scala new file mode 100644 index 0000000..41270bf --- /dev/null +++ b/app/src/main/scala/Main.scala @@ -0,0 +1,149 @@ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.Mapper +import org.apache.hadoop.mapreduce.Reducer +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat + +import org.apache.hadoop.fs.FileSystem +import java.io.IOException + +import scala.jdk.CollectionConverters.* +import scala.annotation.static + +import upickle.default.{ReadWriter => RW, macroRW} +import upickle._ + +case class Transaction(var transaction_id: Long, var product_id: Long, var category: String, var price: Long, var quantity: Long) {} +object Transaction { + implicit val rw: RW[Transaction] = macroRW +} + +case class CategoryData(var category: String, var revenue: Long, var quantity: Long) {} +object CategoryData { + implicit val rw: RW[CategoryData] = macroRW +} + +class Phase1Mapper extends Mapper[Object, Text, Text, Text] { + private def parse_price(str: String): Long = (str.toDouble * 1000).toLong + + override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, Text]#Context): Unit = + if (value != null && !value.toString().contains("transaction_id,product_id,category,price,quantity")) then { + val data: Array[String] = value.toString().split(","); + val transaction_id: Long = data(0).toLong; + val product_id: Long = data(1).toLong; + val category: String = data(2); + val price: Long = parse_price(data(3)) + val quantity: Long = data(4).toLong; + + context.write(Text(category), Text(upickle.write(Transaction(transaction_id, product_id, category, price, quantity)))); + } +} + +class Phase1Combiner extends Reducer[Text, Text, Text, Text] { + override def reduce(key: Text, values: java.lang.Iterable[Text], context: Reducer[Text, Text, Text, Text]#Context): Unit = { + var revenue: Long = 0; + var quantity: Long = 0; + for valuesrc <- values.asScala do { + val value = upickle.read[Transaction](valuesrc.toString()) + revenue = revenue + value.quantity * value.price; + quantity = quantity + value.quantity; + } + + context.write(key, Text(upickle.write(CategoryData(key.toString(), revenue, quantity)))); + } +} + +class Phase1Reducer extends Reducer[Text, Text, LongWritable, Text] { + override def reduce(key: Text, values: java.lang.Iterable[Text], context: Reducer[Text, Text, LongWritable, Text]#Context): Unit = { + var revenue: Long = 0; + var quantity: Long = 0; + for valuesrc <- values.asScala do { + val value = upickle.read[CategoryData](valuesrc.toString()) + revenue = revenue + value.revenue; + quantity = quantity + value.quantity; + } + + context.write(LongWritable(revenue), Text(upickle.write(CategoryData(key.toString(), revenue, quantity)))); + } +} + +class App; +object App { + @static + def main(args: Array[String]): Unit = { + val input_dir = Path("/data"); + val output_dir = Path("/results"); + + val jobconf = Configuration(); + val fs = FileSystem.get(jobconf); + if (fs.exists(output_dir)) then { + fs.delete(output_dir, true); + } + + val job1 = Job.getInstance(jobconf, "Lab3"); + job1.setJarByClass(classOf[App]); + job1.setInputFormatClass(classOf[TextInputFormat]); + job1.setMapperClass(classOf[Phase1Mapper]); + job1.setMapOutputKeyClass(classOf[Text]); + job1.setMapOutputValueClass(classOf[Text]); + job1.setCombinerClass(classOf[Phase1Combiner]); + job1.setReducerClass(classOf[Phase1Reducer]); + job1.setOutputKeyClass(classOf[LongWritable]); + job1.setOutputValueClass(classOf[Text]); + FileInputFormat.addInputPath(job1, input_dir); + FileOutputFormat.setOutputPath(job1, output_dir); + job1.waitForCompletion(true); + } +} + +type Mapper2 = Mapper[Object, Text, LongWritable, Text]; +class Phase2Mapper extends Mapper2 { + override def map(key: Object, value: Text, context: Mapper2#Context): Unit = { + val strs = value.toString().split("\\s").splitAt(1).toList.map(_.mkString) + context.write(LongWritable(strs(0).toLong * -1), Text(strs(1))); + } +} + +type Reducer2 = Reducer[LongWritable, Text, Void, Text] +class Phase2Reducer extends Reducer2 { + override def reduce(key: LongWritable, values: java.lang.Iterable[Text], context: Reducer2#Context): Unit = + for value <- values.asScala do { + val obj = upickle.read[CategoryData](value.toString()); + context.write(null, Text(f"${obj.category}%-20s ${obj.revenue}%-20s ${obj.quantity}%-20s")); + } +} + +class App2; +object App2 { + @static + def main(args: Array[String]): Unit = { + val input_dir = Path("/results"); + val output_dir = Path("/results2"); + + val jobconf = Configuration(); + val fs = FileSystem.get(jobconf); + if (fs.exists(output_dir)) then { + fs.delete(output_dir, true); + } + + val job = Job.getInstance(jobconf, "Lab3Phase2"); + job.setJarByClass(classOf[App]); + job.setInputFormatClass(classOf[TextInputFormat]); + job.setMapperClass(classOf[Phase2Mapper]); + job.setMapOutputKeyClass(classOf[LongWritable]); + job.setMapOutputValueClass(classOf[Text]); + job.setReducerClass(classOf[Phase2Reducer]); + job.setOutputKeyClass(classOf[Void]); + job.setOutputValueClass(classOf[Text]); + FileInputFormat.addInputPath(job, input_dir); + FileOutputFormat.setOutputPath(job, output_dir); + job.waitForCompletion(true); + } +} diff --git a/config b/config new file mode 100644 index 0000000..8909f35 --- /dev/null +++ b/config @@ -0,0 +1,27 @@ +HADOOP_HOME=/opt/hadoop +CORE-SITE.XML_fs.default.name=hdfs://namenode +CORE-SITE.XML_fs.defaultFS=hdfs://namenode +HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020 +HDFS-SITE.XML_dfs.replication=1 +MAPRED-SITE.XML_mapreduce.framework.name=yarn +MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME +MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME +MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME +YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager +YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false +YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600 +YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false +YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=* +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=* +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40 +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings= +CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..07b8f72 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,38 @@ +services: + namenode: + container_name: namenode + image: apache/hadoop:3.4.1 + hostname: namenode + command: ["hdfs", "namenode"] + ports: + - 9870:9870 + env_file: + - ./config + environment: + ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name" + datanode: + container_name: datanode + image: apache/hadoop:3.4.1 + command: ["hdfs", "datanode"] + volumes: + - ./app/target/scala-3.7.4/:/app/ + - ./data/:/data/ + - ./results/:/results/ + env_file: + - ./config + resourcemanager: + container_name: resourcemanager + image: apache/hadoop:3.4.1 + hostname: resourcemanager + command: ["yarn", "resourcemanager"] + ports: + - 8088:8088 + env_file: + - ./config + nodemanager: + container_name: nodemanager + image: apache/hadoop:3.4.1 + command: ["yarn", "nodemanager"] + env_file: + - ./config +