-
Notifications
You must be signed in to change notification settings - Fork 0
/
CIN_recycling_question2.txt
58 lines (51 loc) · 5.04 KB
/
CIN_recycling_question2.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// QUESTION 2:
// What are the top 10 city blocks measured by average monthly recycling program participation rates?
// For the purposes of this question, a city block is defined as addresses on the same street having numbers within the same “hundreds” place
// Consider residential addresses only and exclude any block with fewer than 10 addresses.
// import dataset
val CIN_tipData = spark.read.format("csv").option("header", true).option("inferSchema", true).load("/user/maria_dev/final/part3/CIN_Recycle_Carts_Tip_Data.csv")
.withColumn("month", month( to_date($"DATE", "MM/dd/yyy") ) )
.withColumn("year", year( to_date($"DATE", "MM/dd/yyy") ) )
import org.apache.spark.sql.expressions._ // needed for window function
val windowRank = Window.orderBy( desc("avgBlockMPR") ) // ranking window definition
import org.apache.spark.sql.functions.regexp_extract // needed for regular expressions
val addRegex = "(\\d+)\\w? (\\w+ (\\w+ ?)*)" // define regex pattern to read street and address number
val top10blocks = CIN_tipData
.filter($"OESCLASSIFICATION" === "RESIDENTIAL" && $"ADDRESSID".isNotNull && $"ADDRESS".isNotNull) // filter by question criteria and remove invalid data
.withColumn("street", regexp_extract($"ADDRESS", addRegex, 2) ) // extract street name
.withColumn("blockNo", floor( regexp_extract($"ADDRESS", addRegex, 1) / 100) * 100 ) // extract address number and calculate block number
.withColumn("block", concat($"blockNo", lit(" "), $"street") ) // concatenate block number with street name
.groupBy($"ADDRESSID", $"year", $"month", $"block") // group data by address and month
.agg( sum( $"CARTLIFTS").alias("addMonthTips") ) // aggregate sum of cart tips per month for each address
.withColumn("addMonthPart", when($"addMonthTips" >= 1, 1).otherwise(0) ) // determine monthly participation status for each address
.withColumn("addRecord", lit(1) ) // marker to track count of address records
.groupBy($"block", $"year", $"month") // group addresses by block and month
.agg( ( sum($"addMonthPart") / sum($"addRecord") ).alias("blockMonthPartRate"), sum($"addRecord").alias("nBuildings") ) // aggregate total participants divided by total addresses per block; track total number of buildings
.withColumn("blockString", concat($"block", lit(": "), $"nBuildings" ) ) // string for use in results display
.filter($"nBuildings" >= 10) // filter by question criteria
.groupBy($"blockString") // group addresses by block
.agg( avg($"blockMonthPartRate").alias("avgBlockMPR") ) // aggregate average monthly participation rates
.withColumn("rank", dense_rank.over(windowRank) ) // dense rank average monthly participation rates
.limit(10) // filter to top 10 ranked blocks
top10blocks
.show(false) // display results
##################################################################################
import org.apache.spark.sql.expressions._
windowRank: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@79c079c2
import org.apache.spark.sql.functions.regexp_extract
addRegex: String = (\d+)\w? (\w+ (\w+ ?)*)
top10blocks: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [blockString: string, avgBlockMPR: double ... 1 more field]
+-----------------------+------------------+----+
|blockString |avgBlockMPR |rank|
+-----------------------+------------------+----+
|700 WAKEFIELD DR: 15 |0.9749999999999999|1 |
|800 WAKEFIELD DR: 17 |0.9647058823529411|2 |
|100 LAFAYETTE CIR: 24 |0.9541666666666668|3 |
|1200 DEAN CT: 13 |0.9365384615384615|4 |
|6200 ROBISON RD: 25 |0.9360000000000005|5 |
|1200 CLIFF LAINE DR: 12|0.9354166666666666|6 |
|1400 OAK KNOLL DR: 16 |0.934375 |7 |
|6300 KINCAID RD: 13 |0.9211538461538462|8 |
|2100 BUDWOOD CT: 12 |0.9166666666666666|9 |
|6300 PARKMAN PL: 12 |0.9125 |10 |
+-----------------------+------------------+----+