-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathRetailSalesIngestion.pig
35 lines (32 loc) · 1.89 KB
/
RetailSalesIngestion.pig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-- Cleanup HDFS directory
rmf /user/admin/retail/retailsalesclean
rmf /user/admin/retail/georevenue
-- Loading raw data
InputFile = LOAD '/user/admin/retail/retailsalesraw/OnlineRetail.txt' using PigStorage('\t')
as ( InvoiceNo: int,
StockCode: chararray,
Description: chararray,
Quantity: int,
InvoiceDate: chararray,
UnitPrice: float,
CustomerID: int,
Country: chararray);
-- Cleansing File
RetailSalesRaw = filter InputFile BY NOT (InvoiceDate matches 'InvoiceDate');
RetailSalesClean = FOREACH RetailSalesRaw GENERATE InvoiceNo,
StockCode,
Description,
Quantity,
CONCAT(InvoiceDate,':00') as (InvoiceDate:chararray),
UnitPrice,
ROUND(UnitPrice * Quantity * 100f)/100f as (TotalPrice: float),
CustomerID,
Country;
-- Storing Cleansed File
STORE RetailSalesClean into '/user/admin/retail/retailsalesclean' using PigStorage ('\t');
-- Generate Overall Sales Aggregate and Sales for top 10 countries
GeoGroup = group RetailSalesClean by Country;
GeoRevenue = foreach GeoGroup generate group, ROUND(SUM(RetailSalesClean.TotalPrice)) as TotalRevenueByCountry;
GeoRevenueDesc = ORDER GeoRevenue BY TotalRevenueByCountry DESC;
Top10GeoRevenue = LIMIT GeoRevenueDesc 10;
STORE Top10GeoRevenue into '/user/admin/retail/georevenue' using PigStorage ('\t');