-
Notifications
You must be signed in to change notification settings - Fork 1
/
write_csv.py
94 lines (75 loc) · 2.82 KB
/
write_csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import os
import dotenv
import xarray as xr
from tqdm import tqdm
from joblib import Parallel, delayed
from timer import Timer
dotenv.load_dotenv()
CSV_PATH = os.getenv("CSV_PATH")
nc_filepaths = [
"e5.oper.an.sfc.128_164_tcc.ll025sc.1995030100_1995033123.nc",
"e5.oper.an.sfc.128_165_10u.ll025sc.1995030100_1995033123.nc",
"e5.oper.an.sfc.128_166_10v.ll025sc.1995030100_1995033123.nc",
"e5.oper.an.sfc.128_167_2t.ll025sc.1995030100_1995033123.nc",
"e5_snowfall_199305.nc",
"e5_total_precipitation_199305.nc"
]
cols_renamed = {
"VAR_2T": "temperature_2m",
"VAR_10U": "zonal_wind_10m",
"VAR_10V": "meridional_wind_10m",
"TCC": "total_cloud_cover",
"tp": "total_precipitation",
"sf": "snowfall"
}
def latlon_to_location_id(lats, lons, dlat=0.25, dlon=0.25, min_lat=-90, min_lon=-179.75):
n_lats = int(180 / dlat) + 1 # +1 to include both endpoints (poles)
n_lons = int(360 / dlon)
lat_indices = ((lats - min_lat) / dlat).astype(int)
lon_indices = ((lons - min_lon) / dlon).astype(int)
return lat_indices * n_lons + lon_indices + 1
def weather_dataframe(n):
with Timer(f"Loading data for hour {n}"):
ds = xr.open_mfdataset(nc_filepaths)
df = ds.isel(time=n).to_dataframe().reset_index()
df.drop(columns=["utc_date"], inplace=True)
df.rename(columns=cols_renamed, inplace=True)
df["temperature_2m"] -= 273.15 # Kelvin to Celsius
df["total_precipitation"] *= 1000 # m to mm
df["snowfall"] *= 1000 # m to mm
# Convert latitude from [0, 360) degrees East to [-180, 180) degrees East.
df.rename(columns={"longitude": "longitude_east"}, inplace=True)
df["longitude"] = df["longitude_east"].apply(lambda x: x - 360 if x > 180 else x)
df.drop(columns=["longitude_east"], inplace=True)
df["location_id"] = latlon_to_location_id(df.latitude, df.longitude)
df = df[[
"time",
"location_id",
"latitude",
"longitude",
"temperature_2m",
"zonal_wind_10m",
"meridional_wind_10m",
"total_cloud_cover",
"total_precipitation",
"snowfall"
]]
return df
def write_csv(df, filepath):
with Timer(f"Saving {filepath}", n=df.shape[0]):
df.to_csv(
filepath,
index=False,
header=False,
date_format="%Y-%m-%d %H:%M:%S"
)
return filepath
def _tmp(n):
return write_csv(weather_dataframe(n), f"{CSV_PATH}/weather_hour{n}.csv")
if __name__ == "__main__":
ds = xr.open_dataset("e5.oper.an.sfc.128_167_2t.ll025sc.1995030100_1995033123.nc")
Parallel(n_jobs=32)(
delayed(_tmp)(n)
for n in tqdm(range(len(ds.time)))
)
# write_csv(weather_dataframe(0), f"{CSV_PATH}/weather_hour0.csv")