-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathetl.py
235 lines (191 loc) · 6.62 KB
/
etl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import logging
import configparser
import psycopg2
import boto3
import psycopg2.extensions as psycopg2Ext
from sql_queries import copy_table_queries, insert_table_queries
logger = logging.getLogger(__name__)
def load_staging_tables(
SCHEMA_NAME: str,
S3_LOG_DATA: str,
S3_LOG_JSONPATH: str,
S3_SONG_DATA: str,
roleArn: str,
cur: psycopg2Ext.cursor,
conn: psycopg2Ext.connection,
) -> None:
"""
Description: Load partitoned data into the cluster.
Arguments:
SCHEMA_NAME (str): schema
S3_LOG_DATA (str): log data path in S3
S3_LOG_JSONPATH (str): jsonpath in S3
S3_SONG_DATA (str): song data path in S3
roleArn (str): IAM role ARN
s3_buckets (Tuple[str]): tuple containing addresses of S3 buckets
cur (psycopg2Ext.cursor): cursor object
conn (psycopg2Ext.connection): connection object
Returns:
None
"""
queries = [
copy_table_queries[0].format(
SCHEMA_NAME, S3_LOG_DATA, roleArn, S3_LOG_JSONPATH
),
copy_table_queries[1].format(SCHEMA_NAME, S3_SONG_DATA, roleArn),
]
print("Copying data from S3 to staging Redshift tables...")
for query in queries:
try:
cur.execute(query)
conn.commit()
except psycopg2.Error as e:
msg = f"ERROR: Could not copy table with query: {query}"
logger.warning(msg, e)
continue
def insert_tables(cur: psycopg2Ext.cursor, conn: psycopg2Ext.connection) -> None:
"""
Description: Insert data from staging tables to final tables.
Arguments:
cur (psycopg2Ext.cursor): cursor object
conn (psycopg2Ext.connection): connection object
Returns:
None
"""
print("Inserting data from staging to final tables...")
for query in insert_table_queries:
try:
cur.execute(query)
conn.commit()
except psycopg2.Error as e:
msg = f"ERROR: Could not insert data into table with query: {query}"
logger.warning(msg, e)
return
def test_queries(
SCHEMA_NAME: str, cur: psycopg2Ext.cursor, conn: psycopg2Ext.connection
) -> None:
"""
Description: Test queries to make sure data is successfully inserted.
Arguments:
SCHEMA_NAME (str): schema
cur (psycopg2Ext.cursor): cursor object
conn (psycopg2Ext.connection): connection object
Returns:
None
"""
for query in insert_table_queries:
tbl_name = query[query.find("INTO") + len("INTO") : query.find("(")].strip()
test_query = f"SELECT * FROM {tbl_name} LIMIT 5;"
print(f"\n==================== TEST -- {tbl_name} ====================")
print(f"Query: `{test_query}`")
try:
cur.execute(test_query)
except psycopg2.Error as e:
msg = f"Could not query table `{tbl_name}`"
logger.warning(msg, e)
conn.commit()
continue
try:
data = cur.fetchall()
except psycopg2.Error as e:
msg = f"Could not fetch data from table `{tbl_name}`"
logger.warning(msg, e)
conn.commit()
continue
for row in data:
print(row)
conn.commit()
return
def main() -> None:
"""
Description: Setup appropriate AWS clients (IAM role and Redshift),
connect to Redshift cluster, load data from S3 to staging tables
in Redshift, and insert data from staging tables to final tables.
Lastly, delete Redshift cluster, detach IAM policy role and
delete IAM role.
Returns:
None
"""
config = configparser.ConfigParser()
config.read("dwh.cfg")
# Load DWH Params from file
KEY = config.get("AWS", "KEY")
SECRET = config.get("AWS", "SECRET")
SCHEMA_NAME = config.get("AWS", "SCHEMA_NAME")
DWH_CLUSTER_IDENTIFIER = config.get("DWH", "DWH_CLUSTER_IDENTIFIER")
DWH_DB = config.get("DWH", "DWH_DB")
DWH_DB_USER = config.get("DWH", "DWH_DB_USER")
DWH_DB_PASSWORD = config.get("DWH", "DWH_DB_PASSWORD")
DWH_PORT = config.get("DWH", "DWH_PORT")
DWH_IAM_ROLE_NAME = config.get("DWH", "DWH_IAM_ROLE_NAME")
DWH_POLICY_ARN = config.get("DWH", "DWH_POLICY_ARN")
S3_LOG_DATA = config.get("S3", "LOG_DATA")
S3_LOG_JSONPATH = config.get("S3", "LOG_JSONPATH")
S3_SONG_DATA = config.get("S3", "SONG_DATA")
# setup iam client
iam = boto3.client(
"iam",
region_name="us-west-2",
aws_access_key_id=KEY,
aws_secret_access_key=SECRET,
)
redshift = boto3.client(
"redshift",
region_name="us-west-2",
aws_access_key_id=KEY,
aws_secret_access_key=SECRET,
)
clusterProp = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)[
"Clusters"
][0]
# cluster endpoint
dwh_endpoint = clusterProp["Endpoint"]["Address"]
# connect to cluster
try:
conn = psycopg2.connect(
f"host={dwh_endpoint} dbname={DWH_DB} user={DWH_DB_USER} \
password={DWH_DB_PASSWORD} port={DWH_PORT}"
)
except psycopg2.Error as e:
msg = "ERROR: Could not make connection to dwh."
logger.warning(msg, e)
return
try:
cur = conn.cursor()
except psycopg2.Error as e:
msg = "ERROR: Could not get cursor to sparkify database."
logger.warning(msg, e)
return
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)["Role"]["Arn"]
# load staging tables
load_staging_tables(
SCHEMA_NAME, S3_LOG_DATA, S3_LOG_JSONPATH, S3_SONG_DATA, roleArn, cur, conn
)
# insert from staging to fact/dim tables
insert_tables(cur, conn)
# test queries
test_queries(SCHEMA_NAME, cur, conn)
conn.close()
print("Deleting cluster...")
# delete cluster
redshift.delete_cluster(
ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True
)
clusterProp = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)[
"Clusters"
][0]
clusterStatus = clusterProp["ClusterStatus"]
while clusterStatus == "deleting":
try:
clusterProp = redshift.describe_clusters(
ClusterIdentifier=DWH_CLUSTER_IDENTIFIER
)["Clusters"][0]
clusterStatus = clusterProp["ClusterStatus"]
except Exception:
break
print(f"Cluster deleted successfully.")
# detach role policy and delete role
iam.detach_role_policy(RoleName=DWH_IAM_ROLE_NAME, PolicyArn=DWH_POLICY_ARN)
iam.delete_role(RoleName=DWH_IAM_ROLE_NAME)
if __name__ == "__main__":
main()