-
Notifications
You must be signed in to change notification settings - Fork 0
/
CIN_recycling_question3.txt
77 lines (67 loc) · 5.3 KB
/
CIN_recycling_question3.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// QUESTION 3:
// List recycling truck routes with a monthly average of cart tips greater than one standard deviation
// above the monthly average of cart tips calculated for all routes. Include residential and commercial addresses.
// Exclude routes labelled “UP” (which appear to be low-volume, supplemental routes).
// import dataset
val CIN_tipData = spark.read.format("csv").option("header", true).option("inferSchema", true).load("/user/maria_dev/final/part3/CIN_Recycle_Carts_Tip_Data.csv")
.withColumn("month", month( to_date($"DATE", "MM/dd/yyy") ) )
.withColumn("year", year( to_date($"DATE", "MM/dd/yyy") ) )
// calculate monthly average cart tips for each route
val avgEachRouteMonthTips = CIN_tipData
.filter($"OESCLASSIFICATION".isNotNull && $"ADDRESSID".isNotNull && $"ROUTE_DAY".isNotNull && !($"ROUTE_DAY" rlike "(\\w+)-(UP)-(\\w+)") ) // filter by question criteria and remove invalid data
.groupBy($"ROUTE_DAY", $"year", $"month") // group by route and month
.agg( sum($"CARTLIFTS").alias("routeMonthTips") ) // aggregate sum of cart tips per month
.groupBy($"ROUTE_DAY") // group by route
.agg( avg($"routeMonthTips").alias("avgRouteMonthTips") ) // aggregate average monthly tips per route
// calculate monthly average cart tips for all routes
val avgAllRouteMonthTips = CIN_tipData
.filter($"OESCLASSIFICATION".isNotNull && $"ADDRESSID".isNotNull && $"ROUTE_DAY".isNotNull && !($"ROUTE_DAY" rlike "(\\w+)-(UP)-(\\w+)") ) // filter by question criteria and remove invalid data
.groupBy($"ROUTE_DAY", $"year", $"month") // group by route and month
.agg( sum($"CARTLIFTS").alias("routeMonthTips") ) // aggregate sum of cart tips per month
.withColumn("allRoutes", lit("All Routes") ) // dummy grouping column
.groupBy($"allRoutes") // group all routes
.agg( avg($"routeMonthTips").alias("meanAllRouteMonthTips"), stddev($"routeMonthTips").alias("sdAllRouteMonthTips") ) // aggregate average monthly tips for all routes
// schema to map DecimalType fields to DoubleType
// https://stackoverflow.com/a/60391887
import org.apache.spark.sql.types.{StructField, DecimalType, DoubleType}
val decimalSchema = avgAllRouteMonthTips.schema.fields.map{
f => f match
{
case StructField(name:String, _:DecimalType, _, _) => col(name).cast(DoubleType)
case _ => col(f.name)
}
}
// create dataframe using decimalSchema
val avgAllRouteMonthTipsDoubles = avgAllRouteMonthTips.select(decimalSchema:_*)
// extract values of average monthly cart tips and standard deviation for all routes
val meanAllRouteMonthTipsValue = avgAllRouteMonthTipsDoubles.first.getDouble(1)
val sdAllRouteMonthTipsValue = avgAllRouteMonthTipsDoubles.first.getDouble(2)
// find routes with average monthly cart tips greater than one SD above average for all routes
val heaviestRoutes = avgEachRouteMonthTips
.filter($"avgRouteMonthTips" - (meanAllRouteMonthTipsValue + sdAllRouteMonthTipsValue ) > 0)
.orderBy( desc("avgRouteMonthTips") )
.withColumnRenamed("avgRouteMonthTips", "Average Monthly Tips")
.withColumnRenamed("ROUTE_DAY", "Route")
heaviestRoutes
.show(false) // display results
##################################################################################
avgEachRouteMonthTips: org.apache.spark.sql.DataFrame = [ROUTE_DAY: string, avgRouteMonthTips: decimal(37,4)]
avgAllRouteMonthTips: org.apache.spark.sql.DataFrame = [allRoutes: string, meanAllRouteMonthTips: decimal(37,4) ... 1 more field]
import org.apache.spark.sql.types.{StructField, DecimalType, DoubleType}
decimalSchema: Array[org.apache.spark.sql.Column] = Array(allRoutes, CAST(meanAllRouteMonthTips AS DOUBLE), sdAllRouteMonthTips)
avgAllRouteMonthTipsDoubles: org.apache.spark.sql.DataFrame = [allRoutes: string, meanAllRouteMonthTips: double ... 1 more field]
meanAllRouteMonthTipsValue: Double = 580.0044
sdAllRouteMonthTipsValue: Double = 314.3280659183882
heaviestRoutes: org.apache.spark.sql.DataFrame = [Route: string, Average Monthly Tips: decimal(37,4)]
+------------+--------------------+
|Route |Average Monthly Tips|
+------------+--------------------+
|TUE-U2-GOLD |1197.4000 |
|WED-U8-GREEN|1098.6000 |
|WED-U3-GREEN|1086.9250 |
|TUE-U8-GOLD |1018.3000 |
|WED-U2-GREEN|986.6750 |
|MON-U2-GREEN|985.6250 |
|THU-U8-GOLD |949.2000 |
|TUE-U12-GOLD|896.9750 |
+------------+--------------------+