-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
29 lines (23 loc) · 815 Bytes
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from db import MySQLClient
import typer
from whalefinder import cleaner, obis, validate
pipeline = typer.Typer()
@pipeline.command('main')
def main(whale: str, startdate: str='', enddate: str=''):
"""
Pipeline orchestration
"""
api = obis.ApiClient()
handler = obis.ObisHandler(api, whale, startdate, enddate)
handler.batch_requests()
validator = validate.Validator(whale, startdate, enddate)
valid_data, error_data = validator.validate_response()
datacleaner = cleaner.WhaleDataCleaner(whale, valid_data, error_data, startdate, enddate)
df = datacleaner.process_and_save()
mysqlclient = MySQLClient()
with mysqlclient as client:
client.to_mysql(df)
if __name__ == '__main__':
print('Running pipeline')
pipeline()
print('Pipeline finished')