From 28b59fecf355b999a9b65835ea30ce0ba51bf2c9 Mon Sep 17 00:00:00 2001 From: Marashov_Alexander Date: Mon, 12 Oct 2020 22:08:49 +0300 Subject: [PATCH] The first homework --- hw0/marashov/src/main/results.txt | 79 +++++++++++++ hw0/marashov/src/main/scala/GeoHash.scala | 76 +++++++++++++ hw0/marashov/src/main/scala/Main.scala | 130 ++++++++++++++++++++++ 3 files changed, 285 insertions(+) create mode 100644 hw0/marashov/src/main/results.txt create mode 100644 hw0/marashov/src/main/scala/GeoHash.scala create mode 100644 hw0/marashov/src/main/scala/Main.scala diff --git a/hw0/marashov/src/main/results.txt b/hw0/marashov/src/main/results.txt new file mode 100644 index 0000000..67571c5 --- /dev/null +++ b/hw0/marashov/src/main/results.txt @@ -0,0 +1,79 @@ +Mean ++---------------+------------------+ +| room_type| avg(price)| ++---------------+------------------+ +| Shared room| 70.13298791018998| +|Entire home/apt|211.88216032823104| +| Private room| 89.51396823968689| ++---------------+------------------+ + +Median ++---------------+------------+ +| room_type|median price| ++---------------+------------+ +| Shared room| 45.0| +|Entire home/apt| 160.0| +| Private room| 70.0| ++---------------+------------+ + +Mode ++---------------+-----+-----+ +| room_type|price|count| ++---------------+-----+-----+ +| Shared room| 35| 81| +|Entire home/apt| 150| 1615| +| Private room| 50| 1412| ++---------------+-----+-----+ + +Variance ++---------------+------------------+ +| room_type| var_samp(price)| ++---------------+------------------+ +| Shared room|10365.890682680929| +|Entire home/apt| 80852.24645965557| +| Private room|23907.680804069663| ++---------------+------------------+ + +Standard deviation ++---------------+------------------+ +| room_type|stddev_samp(price)| ++---------------+------------------+ +| Shared room|101.81301823775253| +|Entire home/apt|284.34529442151063| +| Private room|154.62108783755747| ++---------------+------------------+ + +Offer with maximal price ++--------+--------------------+-----+ +| id| name|price| ++--------+--------------------+-----+ +|18750597|Huge Brooklyn Bro...| 0| ++--------+--------------------+-----+ + +Offer with minimal price ++-------+--------------------+-----+ +| id| name|price| ++-------+--------------------+-----+ +|7003697|Furnished room in...|10000| ++-------+--------------------+-----+ + +Correlation between price and minimum_nights ++---------------------------+ +|corr(price, minimum_nights)| ++---------------------------+ +| 0.04238800501413225| ++---------------------------+ + +Correlation between price and number_of_reviews ++------------------------------+ +|corr(price, number_of_reviews)| ++------------------------------+ +| -0.04806955416645...| ++------------------------------+ + +The most expensive home area 5km x 5km ++-------+-----------------+--------------------+ +|geoHash| avg(price)| coordinates| ++-------+-----------------+--------------------+ +| dr5x6|73.04477611940298|[40.67138671875,-...| ++-------+-----------------+--------------------+ diff --git a/hw0/marashov/src/main/scala/GeoHash.scala b/hw0/marashov/src/main/scala/GeoHash.scala new file mode 100644 index 0000000..572a8d5 --- /dev/null +++ b/hw0/marashov/src/main/scala/GeoHash.scala @@ -0,0 +1,76 @@ +/** + * GeoHash library + * Downloaded from https://github.com/mumoshu/geohash-scala + */ +object GeoHash { + + val base32 = "0123456789bcdefghjkmnpqrstuvwxyz" + + def decodeBounds(geohash: String): ((Double, Double), (Double, Double)) = { + def toBitList(s: String) = s.flatMap { + c => + ("00000" + base32.indexOf(c).toBinaryString). + reverse.take(5).reverse.map('1' ==) + } toList + + def split(l: List[Boolean]): (List[Boolean], List[Boolean]) = { + l match { + case Nil => (Nil, Nil) + case x :: Nil => (x :: Nil, Nil) + case x :: y :: zs => val (xs, ys) = split(zs); (x :: xs, y :: ys) + } + } + + def dehash(xs: List[Boolean], min: Double, max: Double): (Double, Double) = { + ((min, max) /: xs) { + case ((min, max), b) => + if (b) ((min + max) / 2, max) + else (min, (min + max) / 2) + } + } + + val (xs, ys) = split(toBitList(geohash)) + (dehash(ys, -90, 90), dehash(xs, -180, 180)) + } + + def decode(geohash: String): (Double, Double) = { + decodeBounds(geohash) match { + case ((minLat, maxLat), (minLng, maxLng)) => ((maxLat + minLat) / 2, (maxLng + minLng) / 2) + } + } + + def encode(lat: Double, lng: Double): String = encode(lat, lng, 12) + + def encode(lat: Double, lng: Double, precision: Int): String = { + + var (minLat, maxLat) = (-90.0, 90.0) + var (minLng, maxLng) = (-180.0, 180.0) + val bits = List(16, 8, 4, 2, 1) + + (0 until precision).map { p => { + base32 apply (0 until 5).map { i => { + if (((5 * p) + i) % 2 == 0) { + val mid = (minLng + maxLng) / 2.0 + if (lng > mid) { + minLng = mid + bits(i) + } else { + maxLng = mid + 0 + } + } else { + val mid = (minLat + maxLat) / 2.0 + if (lat > mid) { + minLat = mid + bits(i) + } else { + maxLat = mid + 0 + } + } + } + }.reduceLeft((a, b) => a | b) + } + }.mkString("") + } +} diff --git a/hw0/marashov/src/main/scala/Main.scala b/hw0/marashov/src/main/scala/Main.scala new file mode 100644 index 0000000..b6984fa --- /dev/null +++ b/hw0/marashov/src/main/scala/Main.scala @@ -0,0 +1,130 @@ +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.expressions.Window +import org.apache.spark.sql.functions.{col, corr, desc, lit, row_number, stddev, udf, variance} + +object Main { + def main(args: Array[String]): Unit = { + + val spark = SparkSession + .builder() + .appName("SparkHomework1") + .master("local") + .getOrCreate() + + val sparkContext = spark.sparkContext + sparkContext.setLogLevel("ERROR") + + val dataFrame = spark.read + .option("header", "true") + .option("mode", "DROPMALFORMED") + .option("escape", "\"") + .csv("src/main/resources/AB_NYC_2019.csv") + + val castedDataFrame = dataFrame + .withColumn("price", dataFrame("price").cast("Long")) + .withColumn("latitude", dataFrame("latitude").cast("Double")) + .withColumn("longitude", dataFrame("longitude").cast("Double")) + + println("Mean") + castedDataFrame + .groupBy("room_type") + .mean("price") + .show() + + println("Median") + castedDataFrame.createOrReplaceTempView("dataFrame") + spark.sql( + """ + |SELECT room_type, percentile_approx(price, 0.5) as `median price` + |FROM dataFrame + |GROUP BY room_type + """.stripMargin + ).show() + + println("Mode") + dataFrame + .groupBy("room_type", "price") + .count() + .withColumn( + "row_number", + row_number() + .over( + Window.partitionBy("room_type") + .orderBy(desc("count")) + ) + ) + .select("room_type", "price", "count") + .where(col("row_number") === 1) + .show() + + println("Variance") + dataFrame + .groupBy("room_type") + .agg(variance(dataFrame("price"))) + .show() + + println("Standard deviation") + dataFrame + .groupBy("room_type") + .agg(stddev(dataFrame("price"))) + .show() + + println("Offer with maximal price") + castedDataFrame + .orderBy("price") + .limit(1) + .select("id", "name", "price") + .show() + + println("Offer with minimal price") + castedDataFrame + .orderBy(castedDataFrame("price").desc) + .limit(1) + .select("id", "name", "price") + .show() + + println("Correlation between price and minimum_nights") + dataFrame + .select( + corr("price", "minimum_nights") + ) + .show() + + + println("Correlation between price and number_of_reviews") + dataFrame + .select( + corr("price", "number_of_reviews") + ) + .show() + + val udfEncode = udf( + (lat: Double, lng: Double, precision: Int) => GeoHash.encode(lat, lng, precision) + ) + val udfDecode = udf( + (geoHash: String) => GeoHash.decode(geoHash) + ) + + println("The most expensive home area 5km x 5km") + castedDataFrame + .withColumn( + "geoHash", + udfEncode( + castedDataFrame("latitude"), + castedDataFrame("longitude"), + lit(5) + )) + .groupBy(col("geoHash")) + .avg("price") + .orderBy("avg(price)") + .limit(1) + .withColumn( + "coordinates", + udfDecode( + col("geoHash") + ) + ) + .show() + + } +}