-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathetl.py
69 lines (47 loc) · 1.52 KB
/
etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import configparser
import psycopg2
from sql_queries import copy_table_queries, insert_table_queries
def load_staging_tables(cur, conn):
"""
Description: This function is responsible for reading the json file in the s3 bucket, and copyting the data in the staging tables
Arguments:
cur: the cursor object.
filepath: song data file path.
Returns:
None
"""
for query in copy_table_queries:
cur.execute(query)
conn.commit()
def insert_tables(cur, conn):
"""
Description: This function is insert data into the final tables from the staging tables.
Arguments:
cur: the cursor object.
filepath: song data file path.
Returns:
None
"""
for query in insert_table_queries:
cur.execute(query)
conn.commit()
def main():
"""
Description: This function is the main function of this file.
It creates the connection to the DB and its cursor.
Then calls the respective etl functions to
execute the complete etl pipeline.
Arguments:
None
Returns:
None
"""
config = configparser.ConfigParser()
config.read('dwh.cfg')
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()
load_staging_tables(cur, conn)
insert_tables(cur, conn)
conn.close()
if __name__ == "__main__":
main()