-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy path2_PreprocData.py
213 lines (185 loc) · 10.4 KB
/
2_PreprocData.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import sys
sys.path.append("eoas_pyutils")
sys.path.append("eoas_pyutils/hycom_utils/python")
from hycom.io import read_hycom_fields, read_hycom_coords, read_field_names
from datetime import datetime
from io_utils.io_netcdf import read_netcdf
from io_utils.io_common import create_folder
from viz_utils.eoa_viz import EOAImageVisualizer
from viz_utils.eoa_viz import PlotMode
from io_utils.dates_utils import get_days_from_month
import pandas as pd
from io_project.read_utils import *
import xarray as xr
import re
import numpy as np
from multiprocessing import Pool
from constants_proj.AI_proj_params import PreprocParams, ParallelParams
from config.PreprocConfig import get_preproc_config
# Not sure how to move this inside the function
NUM_PROC = None # Set bellow
_minlat = 7.00250
_minlon = -98.08
_maxlat = 31.9267
_maxlon = -56.08
# # Here we identify lat and lons before hand. TODO improve this making it local to functions
coords_file = "/nexsan/people/abozec/TSIS/GOMb0.04/topo/regional.grid.a"
# coord_fields = [ 'plon','plat','qlon','qlat','ulon','ulat','vlon','vlat']
coord_fields = ['plon','plat']
print("Reading hycom coordinates....")
hycom_coords = read_hycom_coords(coords_file, coord_fields)
_lats = hycom_coords['plat'][:,0]
_lons = hycom_coords['plon'][0,:]
print("Done!")
def preproc_data(proc_id):
"""
The paralellization is done by days. Each process will process a different day of the month (up to 30).
This function preprocess the desired data. It does the following:
1) Looks for dates where there is 'increment', model, and observations data.
2) Saves the files on the same folder with only the 'desired' fields in netcdf format
:param proc_id:
:return:
"""
print("Preprocessing data....")
config = get_preproc_config()
input_folder_increment = config[PreprocParams.input_folder_tsis]
input_folder_model = config[PreprocParams.input_folder_hycom]
input_folder_obs = config[PreprocParams.input_folder_obs]
output_folder = config[PreprocParams.output_folder]
YEARS = config[PreprocParams.YEARS]
MONTHS = config[PreprocParams.MONTHS]
fields = config[PreprocParams.fields_names]
obs_fields = config[PreprocParams.fields_names_obs]
layers = config[PreprocParams.layers_to_plot]
img_viz = EOAImageVisualizer(output_folder=output_folder, disp_images=False)
# Create folder form io_utils.io_common
create_folder(output_folder)
# These are the data assimilated files
for c_year in YEARS:
for c_month in MONTHS:
print(F"=============== Year: {c_year} Month: {c_month} ===========")
days_of_month, days_of_year = get_days_from_month(c_month)
# Rads all the files for this month
da_files, da_paths = get_hycom_file_name(input_folder_increment, c_year, c_month)
hycom_files, hycom_paths = get_hycom_file_name(input_folder_model, c_year, c_month, day_idx=2)
obs_files, obs_paths = get_obs_file_names(input_folder_obs, c_year, c_month)
# This for is fixed to be able to run in parallel
for c_day_of_month, c_day_of_year in enumerate(days_of_year):
if (c_day_of_month % NUM_PROC) == proc_id:
re_increment = F'incupd.{c_year}_{c_day_of_year:03d}\S*.a'
re_model = F'archv.{c_year}_{c_day_of_year:03d}\S*.a'
re_obs = F'tsis_obs_gomb4_{c_year}{c_month:02d}{c_day_of_month+1:02d}\S*.nc'
try:
da_file_idx = [i for i, file in enumerate(da_files) if re.search(re_increment, file) != None][0]
print(F" =============== Working with (hycom) : {da_files[da_file_idx]} Proc_id={proc_id} ============= ")
da_np_fields = read_hycom_fields(da_paths[da_file_idx], fields, layers=layers)
hycom_file_idx = [i for i, file in enumerate(hycom_files) if re.search(re_model, file) != None][0]
hycom_np_fields = read_hycom_fields(hycom_paths[hycom_file_idx], fields, layers=layers)
# --------- Preprocessing Increment (TSIS) -------------
proc_increment_data(da_np_fields, hycom_np_fields, fields,
join(output_folder, F"increment_{c_year}_{c_day_of_year:03d}.nc"))
except Exception as e:
print(F"Warning: Increment file for date {c_year}-{c_month}-{c_day_of_month} ({re_increment}) doesn't exist: {e}")
# Only when the increment file is not found we go to the next day.
continue
try:
print(F" --------------- Working with (increment): {hycom_files[hycom_file_idx]} ------------- ")
hycom_file_idx = [i for i, file in enumerate(hycom_files) if re.search(re_model, file) != None][0]
hycom_np_fields = read_hycom_fields(hycom_paths[hycom_file_idx], fields, layers=layers)
# --------- Preprocessing HYCOM data -------------
proc_model_data(hycom_np_fields, fields, join(output_folder, F"model_{c_year}_{c_day_of_year:03d}.nc"))
except Exception as e:
print(F"Warning: HYCOM file for date {c_year}-{c_month}-{c_day_of_month} ({re_model}) doesn't exist: {e}")
try:
obs_file_idx = [i for i, file in enumerate(obs_files) if re.search(re_obs, file) != None][0]
# --------- Preprocessing observed data -------------
print(F" --------------- Working with (obs): {hycom_files[hycom_file_idx]} ------------- ")
obs_ds = xr.load_dataset(obs_paths[obs_file_idx])
for id_field, c_obs_field in enumerate(obs_fields):
if id_field == 0:
preproc_obs_ds = obs_ds[c_obs_field].to_dataset()
else:
preproc_obs_ds = preproc_obs_ds.merge(obs_ds[c_obs_field].to_dataset())
# --------------- Here we add the fields from the profiles (PIEAS) as gridded data -----------
# temp_group = 0
# saln_group = 1
# sst_p = np.zeros(preproc_obs_ds[c_obs_field].values.shape)
# sss_p = np.zeros(sst_p.shape)
# profiles = obs_ds.val
# tot_profiles = profiles.shape[0]
# obs_groups = obs_ds.ob_grp_present
# lons_i = obs_ds.grdi.values[:, 0, 0]
# lats_i = obs_ds.grdj.values[:, 0, 0]
# for i_group, c_type in enumerate(obs_groups):
# if c_type == saln_group or c_type == temp_group:
# for c_profile_i in range(tot_profiles):
# c_data = profiles[c_profile_i, -1, i_group]
# if c_type == saln_group:
# sss_p[int(lats_i[c_profile_i]), int(lons_i[c_profile_i])] = c_data
# if c_type == temp_group:
# sst_p[int(lats_i[c_profile_i]), int(lons_i[c_profile_i])] = c_data
# print(F"Max value: {np.amax(sst_p)}")
# print(F"Max value s: {np.amax(sss_p)}")
# preproc_obs_ds['sst_p'] = xr.DataArray(sst_p, dims=['yc', 'xc'])
# preproc_obs_ds['sss_p'] = xr.DataArray(sss_p, dims=['yc', 'xc'])
preproc_obs_ds.to_netcdf(join(output_folder, F"obs_{c_year}_{c_day_of_year:03d}.nc"))
except Exception as e:
print(F"Warning: OBS file for date {c_year}-{c_month}-{c_day_of_month} doesn't exist: {e}")
def proc_increment_data(increment_fields, model_fields, field_names, file_name):
"""
Preprocess the increment data. It removes the corresponding model data from the increment.
:param increment_fields:
:param model_fields:
:param field_names:
:param file_name:
:return:
"""
rows = increment_fields[field_names[0]].shape[1]
cols = increment_fields[field_names[0]].shape[2]
# lats = np.linspace(_minlat, _maxlat, rows)
# lons = np.linspace(_minlon, _maxlon, cols)
lats = _lats
lons = _lons
for id_field, c_field in enumerate(field_names):
# df_var = {c_field: (("lat", "lon"), increment_fields[c_field][0])}
df_var = {c_field: (("lat", "lon"), (increment_fields[c_field][0] - model_fields[c_field][0]))}
if id_field == 0:
preproc_increment_ds = xr.Dataset(df_var, {"lat": lats, "lon": lons})
else:
temp = xr.Dataset(df_var, {"lat": lats, "lon": lons})
preproc_increment_ds = preproc_increment_ds.merge(temp)
preproc_increment_ds = addLatLon(preproc_increment_ds , lats, lons, rows, cols)
preproc_increment_ds.to_netcdf(file_name)
def addLatLon(ds, lats, lons, rows, cols):
# Adding lat and lon as additional fields
# " ---- LON -----"
df_var = {"LON": (("lat", "lon"), lons * np.ones((rows,1)))}
temp = xr.Dataset(df_var, {"lat": lats, "lon": lons})
ds= ds.merge(temp)
# " ---- LAT -----"
df_var = {"LAT": (("lat", "lon"), (lats * np.ones((cols,1))).T)}
temp = xr.Dataset(df_var, {"lat": lats, "lon": lons})
ds= ds.merge(temp)
return ds
def proc_model_data(np_fields, field_names, file_name):
rows = np_fields[field_names[0]].shape[1]
cols = np_fields[field_names[0]].shape[2]
# lats = np.linspace(_minlat, _maxlat, rows)
# lons = np.linspace(_minlon, _maxlon, cols)
lats = _lats
lons = _lons
for id_field, c_field in enumerate(field_names):
df_var = {c_field: (("lat", "lon"), np_fields[c_field][0])}
if id_field == 0:
preproc_model_ds = xr.Dataset(df_var, {"lat": lats, "lon": lons})
else:
temp = xr.Dataset(df_var, {"lat": lats, "lon": lons})
preproc_model_ds = preproc_model_ds.merge(temp)
preproc_model_ds = addLatLon(preproc_model_ds, lats, lons, rows, cols)
preproc_model_ds.to_netcdf(file_name)
if __name__ == '__main__':
NUM_PROC = 30
# ----------- Parallel -------
p = Pool(NUM_PROC)
p.map(preproc_data, range(NUM_PROC))
# preproc_data(0)