Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file removed .github/.keep
Empty file.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
data/
results/
2 changes: 2 additions & 0 deletions .lfsconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[lfs]
url = https://github.com/AdvancedJavaLabs/lab4-parallel.git/info/lfs
3 changes: 0 additions & 3 deletions 0.csv

This file was deleted.

3 changes: 0 additions & 3 deletions 1.csv

This file was deleted.

3 changes: 0 additions & 3 deletions 2.csv

This file was deleted.

3 changes: 0 additions & 3 deletions 3.csv

This file was deleted.

3 changes: 0 additions & 3 deletions 4.csv

This file was deleted.

3 changes: 0 additions & 3 deletions 5.csv

This file was deleted.

3 changes: 0 additions & 3 deletions 6.csv

This file was deleted.

3 changes: 0 additions & 3 deletions 7.csv

This file was deleted.

47 changes: 0 additions & 47 deletions README.md

This file was deleted.

32 changes: 32 additions & 0 deletions app/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# macOS
.DS_Store

# sbt specific
dist/*
target/
lib_managed/
src_managed/
project/boot/
project/plugins/project/
project/local-plugins.sbt
.history
.ensime
.ensime_cache/
.sbt-scripted/
local.sbt

# Bloop
.bsp

# VS Code
.vscode/

# Metals
.bloop/
.metals/
metals.sbt

# IDEA
.idea
.idea_modules
/.worksheet/
14 changes: 14 additions & 0 deletions app/.scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
version = "3.10.0"
runner.dialect = scala3
runner.dialectOverride.allowSignificantIndentation = false
maxColumn = 160
indent.fewerBraces = never
rewrite.rules = [RedundantBraces].
rewrite.scala3.removeOptionalBraces.enabled = false
rewrite.scala3.removeOptionalBraces = false
rewrite.insertBraces.minLines = 0
rewrite.insertBraces.allBlocks = true
newlines.source = unfold
danglingParentheses.callSite = true
danglingParentheses.defnSite = true
danglingParentheses.ctrlSite = true
8 changes: 8 additions & 0 deletions app/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
## sbt project compiled with Scala 3

### Usage

This is a normal sbt project. You can compile code with `sbt compile`, run it with `sbt run`, and `sbt console` will start a Scala 3 REPL.

For more information on the sbt-dotty plugin, see the
[scala3-example-project](https://github.com/scala/scala3-example-project/blob/main/README.md).
31 changes: 31 additions & 0 deletions app/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
val scala3Version = "3.7.4"
val hadoopVersion = "3.4.1"
//val sparkVersion = "3.5.7"

lazy val root = project
.in(file("."))
.settings(
name := "App",
version := "1.0.0",
assembly / mainClass := some("App"),
assembly / assemblyJarName := "app.jar",
ThisBuild / assemblyMergeStrategy := {
case x => MergeStrategy.first
},
scalaVersion := scala3Version,
scalacOptions ++= Seq("-unchecked", "-deprecation"),
scalacOptions ++= Seq("-java-output-version", "8"),
javacOptions ++= Seq("-source", "1.8", "-target", "1.8"),

libraryDependencies ++= Seq(
"com.lihaoyi" %% "upickle" % "4.4.1",
"org.apache.hadoop" % "hadoop-mapreduce-client-core" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-mapreduce-client-app" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-client-api" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion % "provided",
"org.apache.hadoop" % "hadoop-hdfs-client" % hadoopVersion % "provided",
//("org.apache.spark" %% "spark-core" % sparkVersion).cross(CrossVersion.for3Use2_13),
//("org.apache.spark" %% "spark-streaming" % sparkVersion).cross(CrossVersion.for3Use2_13),
),
)
1 change: 1 addition & 0 deletions app/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.11.7
1 change: 1 addition & 0 deletions app/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.3.1")
149 changes: 149 additions & 0 deletions app/src/main/scala/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

import org.apache.hadoop.fs.FileSystem
import java.io.IOException

import scala.jdk.CollectionConverters.*
import scala.annotation.static

import upickle.default.{ReadWriter => RW, macroRW}
import upickle._

case class Transaction(var transaction_id: Long, var product_id: Long, var category: String, var price: Long, var quantity: Long) {}
object Transaction {
implicit val rw: RW[Transaction] = macroRW
}

case class CategoryData(var category: String, var revenue: Long, var quantity: Long) {}
object CategoryData {
implicit val rw: RW[CategoryData] = macroRW
}

class Phase1Mapper extends Mapper[Object, Text, Text, Text] {
private def parse_price(str: String): Long = (str.toDouble * 1000).toLong

override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, Text]#Context): Unit =
if (value != null && !value.toString().contains("transaction_id,product_id,category,price,quantity")) then {
val data: Array[String] = value.toString().split(",");
val transaction_id: Long = data(0).toLong;
val product_id: Long = data(1).toLong;
val category: String = data(2);
val price: Long = parse_price(data(3))
val quantity: Long = data(4).toLong;

context.write(Text(category), Text(upickle.write(Transaction(transaction_id, product_id, category, price, quantity))));
}
}

class Phase1Combiner extends Reducer[Text, Text, Text, Text] {
override def reduce(key: Text, values: java.lang.Iterable[Text], context: Reducer[Text, Text, Text, Text]#Context): Unit = {
var revenue: Long = 0;
var quantity: Long = 0;
for valuesrc <- values.asScala do {
val value = upickle.read[Transaction](valuesrc.toString())
revenue = revenue + value.quantity * value.price;
quantity = quantity + value.quantity;
}

context.write(key, Text(upickle.write(CategoryData(key.toString(), revenue, quantity))));
}
}

class Phase1Reducer extends Reducer[Text, Text, LongWritable, Text] {
override def reduce(key: Text, values: java.lang.Iterable[Text], context: Reducer[Text, Text, LongWritable, Text]#Context): Unit = {
var revenue: Long = 0;
var quantity: Long = 0;
for valuesrc <- values.asScala do {
val value = upickle.read[CategoryData](valuesrc.toString())
revenue = revenue + value.revenue;
quantity = quantity + value.quantity;
}

context.write(LongWritable(revenue), Text(upickle.write(CategoryData(key.toString(), revenue, quantity))));
}
}

class App;
object App {
@static
def main(args: Array[String]): Unit = {
val input_dir = Path("/data");
val output_dir = Path("/results");

val jobconf = Configuration();
val fs = FileSystem.get(jobconf);
if (fs.exists(output_dir)) then {
fs.delete(output_dir, true);
}

val job1 = Job.getInstance(jobconf, "Lab3");
job1.setJarByClass(classOf[App]);
job1.setInputFormatClass(classOf[TextInputFormat]);
job1.setMapperClass(classOf[Phase1Mapper]);
job1.setMapOutputKeyClass(classOf[Text]);
job1.setMapOutputValueClass(classOf[Text]);
job1.setCombinerClass(classOf[Phase1Combiner]);
job1.setReducerClass(classOf[Phase1Reducer]);
job1.setOutputKeyClass(classOf[LongWritable]);
job1.setOutputValueClass(classOf[Text]);
FileInputFormat.addInputPath(job1, input_dir);
FileOutputFormat.setOutputPath(job1, output_dir);
job1.waitForCompletion(true);
}
}

type Mapper2 = Mapper[Object, Text, LongWritable, Text];
class Phase2Mapper extends Mapper2 {
override def map(key: Object, value: Text, context: Mapper2#Context): Unit = {
val strs = value.toString().split("\\s").splitAt(1).toList.map(_.mkString)
context.write(LongWritable(strs(0).toLong * -1), Text(strs(1)));
}
}

type Reducer2 = Reducer[LongWritable, Text, Void, Text]
class Phase2Reducer extends Reducer2 {
override def reduce(key: LongWritable, values: java.lang.Iterable[Text], context: Reducer2#Context): Unit =
for value <- values.asScala do {
val obj = upickle.read[CategoryData](value.toString());
context.write(null, Text(f"${obj.category}%-20s ${obj.revenue}%-20s ${obj.quantity}%-20s"));
}
}

class App2;
object App2 {
@static
def main(args: Array[String]): Unit = {
val input_dir = Path("/results");
val output_dir = Path("/results2");

val jobconf = Configuration();
val fs = FileSystem.get(jobconf);
if (fs.exists(output_dir)) then {
fs.delete(output_dir, true);
}

val job = Job.getInstance(jobconf, "Lab3Phase2");
job.setJarByClass(classOf[App]);
job.setInputFormatClass(classOf[TextInputFormat]);
job.setMapperClass(classOf[Phase2Mapper]);
job.setMapOutputKeyClass(classOf[LongWritable]);
job.setMapOutputValueClass(classOf[Text]);
job.setReducerClass(classOf[Phase2Reducer]);
job.setOutputKeyClass(classOf[Void]);
job.setOutputValueClass(classOf[Text]);
FileInputFormat.addInputPath(job, input_dir);
FileOutputFormat.setOutputPath(job, output_dir);
job.waitForCompletion(true);
}
}
27 changes: 27 additions & 0 deletions config
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
HADOOP_HOME=/opt/hadoop
CORE-SITE.XML_fs.default.name=hdfs://namenode
CORE-SITE.XML_fs.defaultFS=hdfs://namenode
HDFS-SITE.XML_dfs.namenode.rpc-address=namenode:8020
HDFS-SITE.XML_dfs.replication=1
MAPRED-SITE.XML_mapreduce.framework.name=yarn
MAPRED-SITE.XML_yarn.app.mapreduce.am.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
MAPRED-SITE.XML_mapreduce.map.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
MAPRED-SITE.XML_mapreduce.reduce.env=HADOOP_MAPRED_HOME=$HADOOP_HOME
YARN-SITE.XML_yarn.resourcemanager.hostname=resourcemanager
YARN-SITE.XML_yarn.nodemanager.pmem-check-enabled=false
YARN-SITE.XML_yarn.nodemanager.delete.debug-delay-sec=600
YARN-SITE.XML_yarn.nodemanager.vmem-check-enabled=false
YARN-SITE.XML_yarn.nodemanager.aux-services=mapreduce_shuffle
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-applications=10000
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.maximum-am-resource-percent=0.1
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.queues=default
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.capacity=100
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.user-limit-factor=1
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.maximum-capacity=100
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.state=RUNNING
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_submit_applications=*
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.root.default.acl_administer_queue=*
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.node-locality-delay=40
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings=
CAPACITY-SCHEDULER.XML_yarn.scheduler.capacity.queue-mappings-override.enable=false
38 changes: 38 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
services:
namenode:
container_name: namenode
image: apache/hadoop:3.4.1
hostname: namenode
command: ["hdfs", "namenode"]
ports:
- 9870:9870
env_file:
- ./config
environment:
ENSURE_NAMENODE_DIR: "/tmp/hadoop-root/dfs/name"
datanode:
container_name: datanode
image: apache/hadoop:3.4.1
command: ["hdfs", "datanode"]
volumes:
- ./app/target/scala-3.7.4/:/app/
- ./data/:/data/
- ./results/:/results/
env_file:
- ./config
resourcemanager:
container_name: resourcemanager
image: apache/hadoop:3.4.1
hostname: resourcemanager
command: ["yarn", "resourcemanager"]
ports:
- 8088:8088
env_file:
- ./config
nodemanager:
container_name: nodemanager
image: apache/hadoop:3.4.1
command: ["yarn", "nodemanager"]
env_file:
- ./config