-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathproblem16
More file actions
69 lines (54 loc) · 4.9 KB
/
problem16
File metadata and controls
69 lines (54 loc) · 4.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// It is possible that, same product can be supplied by multiple supplier. Now find each product, its price according to each supplier.
scala> case class Product(productID: Integer, productCode: String, name: String, quantity: Integer, price: Float, supplierid: Integer)
defined class Product
scala> case class Supplier(supplierid: Integer, name: String, phone: Integer)
defined class Supplier
scala> case class Products_Suppliers(productId: Integer, supplierId: Integer)
defined class Products_Suppliers
scala> val products = sc.textFile("/user/smartlin1/data/product.csv").map(_.split(",")).map(p => Product(p(0).toInt, p(1), p(2), p(3).toInt, p(4).toFloat, p(5).toInt)).toDF()
17/10/10 18:32:02 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 336.8 KB, free 1432.3 KB)
17/10/10 18:32:02 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 28.4 KB, free 1460.7 KB)
17/10/10 18:32:02 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:55382 (size: 28.4 KB, free: 511.0 MB)
17/10/10 18:32:02 INFO SparkContext: Created broadcast 3 from textFile at <console>:29
products: org.apache.spark.sql.DataFrame = [productID: int, productCode: string, name: string, quantity: int, price: float, supplierid: int]
scala> val suppliers = sc.textFile("/user/smartlin1/data/supplier.csv").map(_.split(",")).map(p => Supplier(p(0).toInt, p(1), p(2).toInt)).toDF()
17/10/10 18:35:27 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 336.8 KB, free 1797.5 KB)
17/10/10 18:35:27 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 28.4 KB, free 1825.9 KB)
17/10/10 18:35:27 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:55382 (size: 28.4 KB, free: 511.0 MB)
17/10/10 18:35:27 INFO SparkContext: Created broadcast 4 from textFile at <console>:29
suppliers: org.apache.spark.sql.DataFrame = [supplierid: int, name: string, phone: int]
scala> val products_supplier = sc.textFile("/user/smartlin1/data/products_suppliers.csv").map(_.split(",")).map(p => Products_Suppliers(p(0).toInt, p(1).toInt)).toDF()
17/10/10 18:37:21 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 336.8 KB, free 2.5 MB)
17/10/10 18:37:21 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 28.4 KB, free 2.5 MB)
17/10/10 18:37:21 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:55382 (size: 28.4 KB, free: 510.9 MB)
17/10/10 18:37:21 INFO SparkContext: Created broadcast 6 from textFile at <console>:29
products_supplier: org.apache.spark.sql.DataFrame = [productId: int, supplierId: int]
scala> products.registerTempTable("products")
scala> suppliers.registerTempTable("suppliers")
scala> products_supplier.registerTempTable("products_suppliers")
scala> val results = sqlContext.sql("SELECT products.name AS product_name, price, suppliers.name as supplier_name FROM products_suppliers JOIN products ON products_suppliers.productID=products.productID JOIN suppliers ON products_suppliers.supplierID=suppliers.supplierID")
17/10/10 18:44:14 INFO ParseDriver: Parsing command: SELECT products.name AS product_name, price, suppliers.name as supplier_name FROM products_suppliers JOIN products ON products_suppliers.productID=products.productID JOIN suppliers ON products_suppliers.supplierID=suppliers.supplierID
17/10/10 18:44:14 INFO ParseDriver: Parse Completed
results: org.apache.spark.sql.DataFrame = [product_name: string, price: float, supplier_name: string]
//Find all the supllier name, who are supplying 'Pencil 3B'
scala> val results = sqlContext.sql("SELECT p.name AS Product_Name, s.name AS Supplier_Name FROM products_suppliers AS ps JOIN products AS p ON ps.productID = p.productID JOIN suppliers AS s on ps.supplierID = s.supplierID WHERE p.name ='Pencil 3B'")
17/10/10 19:21:19 INFO ParseDriver: Parsing command: SELECT p.name AS Product_Name, s.name AS Supplier_Name FROM products_suppliers AS ps JOIN products AS p ON ps.productID = p.productID JOIN suppliers AS s on ps.supplierID = s.supplierID WHERE p.name ='Pencil 3B'
17/10/10 19:21:19 INFO ParseDriver: Parse Completed
results: org.apache.spark.sql.DataFrame = [Product_Name: string, Supplier_Name: string]
scala> results.show()
+------------+-------------+
|Product_Name|Supplier_Name|
+------------+-------------+
| Pencil 3B| ABC Traders|
| Pencil 3B| QQ Corp|
+------------+-------------+
// Final all products supplied by ABC Traders
scala> val result = sqlContext.sql("SELECT p.name AS 'Product Name', s.name AS 'Supplier Name' FROM products AS p, products_suppliers AS ps, suppliers AS s WHERE p.productID =ps.productID AND ps.supplierID=s.supplierID AND s.name='ABC Traders'")
scala> results.show()
+------------+-------------+
|Product_Name|Supplier_Name|
+------------+-------------+
| Pencil 3B| ABC Traders|
| Pencil 4B| ABC Traders|
| Pencil 5B| ABC Traders|
+------------+-------------+