-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathluigi_etl.py
60 lines (43 loc) · 1.63 KB
/
luigi_etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from sqlalchemy import create_engine
import luigi
import pandas as pd
import sys
sys.path.insert(0, '../luigi-etl')
class QueryDB1(luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget("DB1_output.csv")
def run(self):
engine = create_engine('sqlite:///db1')
results = pd.read_sql_query('SELECT * from names',engine)
f = self.output().open('w')
results.to_csv(f,encoding = 'utf-8',index=False,header=True,quoting=2)
f.close()
class QueryDB2(luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget("DB2_output.csv")
def run(self):
engine = create_engine('sqlite:///db2')
results = pd.read_sql_query('SELECT * from salaries',engine)
f = self.output().open('w')
results.to_csv(f,encoding = 'utf-8',index=False,header=True,quoting=2)
f.close()
class CreateReport(luigi.Task):
def requires(self):
return [QueryDB1(),QueryDB2()]
def output(self):
return luigi.LocalTarget("./Report.csv")
def run(self):
df1 = pd.read_csv("DB1_output.csv", header = 0, encoding = 'utf-8',index_col = False)
df2 = pd.read_csv("DB2_output.csv", header = 0, encoding = 'utf-8',index_col = False)
df3 = pd.merge(df1,df2,how='inner',on=['id'])
f = self.output().open('w')
df1.to_csv(f,encoding = 'utf-8',index=False,header=True,quoting=2)
f.close()
if __name__ == '__main__':
luigi.run(main_task_cls=CreateReport,
local_scheduler=False
)