-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks.py
91 lines (81 loc) · 3.09 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import os
import redis
import pandas as pd
import numpy as np
import json
from celery import Celery
from sdig.erddap.info import Info
import urllib.parse
import constants
import db
from celery.utils.log import get_task_logger
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
logger = get_task_logger(__name__)
def flush():
constants.redis_instance.flushall()
def update_mission(mid, mission):
logger.debug('Pulling locations for mission ' + str(mission['ui']['title']))
start_dates = []
end_dates = []
long_names = {}
units = {}
dsg_ids = []
drones = mission['drones']
mission_dfs = []
for d in drones:
logger.debug('Reading drone ' + str(d))
drone = drones[d]
info = Info(drone['url'])
depth_name, dsg_var = info.get_dsg_info()
dsg_id = dsg_var[info.get_dsg_type()]
base_url = drone['url'] + '.csv?'
req_vars = 'latitude,longitude,time,' + dsg_id
query = '&orderByClosest("time,1day")&'+dsg_id+'="'+d+'"'
q = urllib.parse.quote(query)
url = base_url + req_vars + q
print(url)
df = pd.read_csv(url, skiprows=[1])
# Don't drop, just take the rows where lat or lon is not NA:
df = df[df['latitude'].notna()]
df = df[df['longitude'].notna()]
df['mission_id'] = mid
df['title'] = mission['ui']['title']
df[dsg_id] = df[dsg_id].astype(str)
drone_vars, d_long_names, d_units, standard_names, var_types = info.get_variables()
drones[d]['variables'] = drone_vars
depth_name, dsg_id = info.get_dsg_info()
dsg_ids.append(dsg_id['trajectory'])
long_names = {**long_names, **d_long_names}
units = {**units, **d_units}
mission_dfs.append(df)
uids = list(set(dsg_ids))
if len(uids) == 1:
mission['dsg_id'] = uids[0]
else:
print('Mission has non-unique DSG ID names.')
long_names = dict(sorted(long_names.items(), key=lambda item: item[1]))
mission['long_names'] = long_names
mission['units'] = units
constants.redis_instance.hset("mission", mid, json.dumps(mission))
full_df = pd.concat(mission_dfs).reset_index()
return full_df
# Run this once from the workspace before deploying the application
def load_missions():
with open('config/missions.json') as missions_config:
config_json = json.load(missions_config)
collections = config_json['collections']
outeridx = 0
for collection in collections:
logger.info('Processing missions for ' + collection)
member = collections[collection]
for idx, mid in enumerate(member['missions']):
mission = member['missions'][mid]
df = update_mission(mid, mission)
if outeridx == 0:
locations_df = df
else:
locations_df = pd.concat([locations_df, df])
outeridx = outeridx + 1
logger.info('Setting the mission locations...')
locations_df.to_sql(constants.locations_table, constants.postgres_engine, if_exists='replace', index=False)