-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSparkSQL - Airline Data
66 lines (53 loc) · 2.58 KB
/
SparkSQL - Airline Data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
Advanced Big Data - Assignment 2
Keerthi R Bollam
Problem Statement
http://stat-computing.org/dataexpo/2009/ consists of flight arrival and departure details for all
commercial flights within the USA, from October 1987 to April 2008. This is a large dataset: there
are nearly 120 million records in total, and takes up 1.6 gigabytes of space compressed and 12
gigabytes when uncompressed.
You may download the data of only 1 year for analysis. For Ex 2007 data is available at: http://statcomputing.org/dataexpo/2009/2007.csv.bz2.
Write sample code (Spark SQL) that finds:
1. Airports that have > 70% delay
2. Airports that have > 50% delay
Output must be Origin and % delay
Use the following and anything else required in the input:
• Origin for Airport Code
• DepDelay to find if a flight had delay
• Do not consider the Cancelled flights
Solution:
1. Start Hadoop Processes and Initiate Spark Shell
start-dfs.sh
start-yarn.sh
spark-shell
2. Importing the CSV file into scala and Removing header
scala> import org.apache.spark.rdd.RDD
val AirlineFile = "file:///home/hduser/Datasets/flights2007.csv"
import au.com.bytecode.opencsv.CSVParser
val FlightData = sc.textFile(AirlineFile)
scala> def dropHeader(data: RDD[String]): RDD[String] = {
data.mapPartitionsWithIndex((idx, lines) => {
if (idx == 0) {
lines.drop(1)
}
lines
})
}
scala> val withoutHeader: RDD[String] = dropHeader(FlightData)
3. Converting RDD to Dataframe by giving schema and filtering out the cancelled flights and NA values
scala> import org.apache.spark.sql._
import sqlContext._
import org.apache.spark.sql.types._
val schema = StructType(Array(StructField("DepDelay",StringType,true),StructField("Origin",StringType,true), StructField("Cancelled",StringType,true)))
val rowRDD = withoutHeader.map(_.split(",")).map(p => Row(p(15), p(16), p(21).trim))
var FlightDataFrame = sqlContext.createDataFrame(rowRDD, schema)
FlightDataFrame = FlightDataFrame.filter(FlightDataFrame("Cancelled")!== "1")
FlightDataFrame = FlightDataFrame.filter(FlightDataFrame("DepDelay")!== "NA")
FlightDataFrame.groupBy("Origin").count().show()
FlightDataFrame.withColumn("DepDelay", 'DepDelay.cast(DoubleType))
4. Calculations for the final output as per the requirement
val result=FlightDataFrame.groupBy("Origin").agg(((sum(when(col("DepDelay") > 0.0,1).otherwise(0))/
count("DepDelay"))*100).alias("%Delay"))
##output for %Delay > 70:
result.filter(result("%Delay")>70).sort("%Delay").show()
##output for %Delay > 50:
result.filter(result("%Delay")>50).sort("%Delay").show()