diff --git a/CHANGELOG.md b/CHANGELOG.md index 03060332d..b60c3625a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ Code freeze date: YYYY-MM-DD ### Added - `climada.util.interpolation` module for inter- and extrapolation util functions used in local exceedance intensity and return period functions [#930](https://github.com/CLIMADA-project/climada_python/pull/930) - + ### Changed - In `climada.util.plot.geo_im_from_array`, NaNs are plotted in gray while cells with no centroid are not plotted [#929](https://github.com/CLIMADA-project/climada_python/pull/929) @@ -21,6 +21,7 @@ Code freeze date: YYYY-MM-DD ### Fixed +- File handles are being closed after reading netcdf files with `climada.hazard` modules [#953](https://github.com/CLIMADA-project/climada_python/pull/953) - Avoids a ValueError in the impact calculation for cases with a single exposure point and MDR values of 0, by explicitly removing zeros in `climada.hazard.Hazard.get_mdr` [#933](https://github.com/CLIMADA-project/climada_python/pull/948) ### Deprecated @@ -471,4 +472,3 @@ updated: - `climada.enginge.impact.Impact.calc()` and `climada.enginge.impact.Impact.calc_impact_yearset()` [#436](https://github.com/CLIMADA-project/climada_python/pull/436). - diff --git a/climada/hazard/isimip_data.py b/climada/hazard/isimip_data.py index 8630d59d7..f9c28e8e3 100644 --- a/climada/hazard/isimip_data.py +++ b/climada/hazard/isimip_data.py @@ -50,12 +50,12 @@ def _read_one_nc(file_name, bbox=None, years=None): Contains data in the specified bounding box and for the specified time period """ - data = xr.open_dataset(file_name, decode_times=False) - if not bbox: - bbox = bbox_world - if not years: - return data.sel(lat=slice(bbox[3], bbox[1]), lon=slice(bbox[0], bbox[2])) - - time_id = years - int(data['time'].units[12:16]) - return data.sel(lat=slice(bbox[3], bbox[1]), lon=slice(bbox[0], bbox[2]), - time=slice(time_id[0], time_id[1])) + with xr.open_dataset(file_name, decode_times=False) as data: + if not bbox: + bbox = bbox_world + if not years: + return data.sel(lat=slice(bbox[3], bbox[1]), lon=slice(bbox[0], bbox[2])) + + time_id = years - int(data['time'].units[12:16]) + return data.sel(lat=slice(bbox[3], bbox[1]), lon=slice(bbox[0], bbox[2]), + time=slice(time_id[0], time_id[1])) diff --git a/climada/hazard/storm_europe.py b/climada/hazard/storm_europe.py index 359c3d8a3..c49630a5a 100644 --- a/climada/hazard/storm_europe.py +++ b/climada/hazard/storm_europe.py @@ -228,37 +228,33 @@ def _read_one_nc(cls, file_name, centroids, intensity_thres): new_haz : StormEurope Hazard instance for one single storm. """ - ncdf = xr.open_dataset(file_name) - - if centroids.size != (ncdf.sizes['latitude'] * ncdf.sizes['longitude']): - ncdf.close() - LOGGER.warning(('Centroids size doesn\'t match NCDF dimensions. ' - 'Omitting file %s.'), file_name) - return None - - # xarray does not penalise repeated assignments, see - # http://xarray.pydata.org/en/stable/data-structures.html - stacked = ncdf['max_wind_gust'].stack( - intensity=('latitude', 'longitude', 'time') - ) - stacked = stacked.where(stacked > intensity_thres) - stacked = stacked.fillna(0) - - # fill in values from netCDF - ssi_wisc = np.array([float(ncdf.attrs['ssi'])]) - intensity = sparse.csr_matrix(stacked) - new_haz = cls(ssi_wisc=ssi_wisc, - intensity=intensity, - event_name=[ncdf.attrs['storm_name']], - date=np.array([datetime64_to_ordinal(ncdf['time'].data[0])]), - # fill in default values - centroids=centroids, - event_id=np.array([1]), - frequency=np.array([1]), - orig=np.array([True]),) - - ncdf.close() - return new_haz + with xr.open_dataset(file_name) as ncdf: + if centroids.size != (ncdf.sizes['latitude'] * ncdf.sizes['longitude']): + LOGGER.warning(('Centroids size doesn\'t match NCDF dimensions. ' + 'Omitting file %s.'), file_name) + return None + + # xarray does not penalise repeated assignments, see + # http://xarray.pydata.org/en/stable/data-structures.html + stacked = ncdf['max_wind_gust'].stack( + intensity=('latitude', 'longitude', 'time') + ) + stacked = stacked.where(stacked > intensity_thres) + stacked = stacked.fillna(0) + + # fill in values from netCDF + ssi_wisc = np.array([float(ncdf.attrs['ssi'])]) + intensity = sparse.csr_matrix(stacked) + new_haz = cls(ssi_wisc=ssi_wisc, + intensity=intensity, + event_name=[ncdf.attrs['storm_name']], + date=np.array([datetime64_to_ordinal(ncdf['time'].data[0])]), + # fill in default values + centroids=centroids, + event_id=np.array([1]), + frequency=np.array([1]), + orig=np.array([True]),) + return new_haz def read_cosmoe_file(self, *args, **kwargs): """This function is deprecated, use StormEurope.from_cosmoe_file instead.""" @@ -309,69 +305,66 @@ def from_cosmoe_file(cls, fp_file, run_datetime, event_date=None, intensity_thres = cls.intensity_thres if intensity_thres is None else intensity_thres # read intensity from file - ncdf = xr.open_dataset(fp_file) - ncdf = ncdf.assign_coords(date=('time',ncdf["time"].dt.floor("D").values)) - - if event_date: - try: - stacked = ncdf.sel( - time=event_date.strftime('%Y-%m-%d') - ).groupby('date').max().stack(intensity=('y_1', 'x_1')) - except KeyError as ker: - raise ValueError('Extraction of date and coordinates failed. This is most likely ' - 'because the selected event_date ' - f'{event_date.strftime("%Y-%m-%d")} is not contained in the ' - 'weather forecast selected by fp_file {fp_file}. Please adjust ' - f'event_date or fp_file.') from ker - considered_dates = np.datetime64(event_date) - else: - time_covered_step = ncdf['time'].diff('time') - time_covered_day = time_covered_step.groupby('date').sum() - # forecast run should cover at least 18 hours of a day - considered_dates_bool = time_covered_day >= np.timedelta64(18,'h') - stacked = ncdf.groupby('date').max()\ - .sel(date=considered_dates_bool)\ - .stack(intensity=('y_1', 'x_1')) - considered_dates = stacked['date'].values - stacked = stacked.stack(date_ensemble=('date', 'epsd_1')) - stacked = stacked.where(stacked['VMAX_10M'] > intensity_thres) - stacked = stacked.fillna(0) - - # fill in values from netCDF - intensity = sparse.csr_matrix(stacked['VMAX_10M'].T) - event_id = np.arange(stacked['date_ensemble'].size) + 1 - date = np.repeat( - np.array(datetime64_to_ordinal(considered_dates)), - np.unique(ncdf['epsd_1']).size - ) - orig = np.full_like(event_id, False) - orig[(stacked['epsd_1'] == 0).values] = True - if description is None: - description = (model_name + - ' weather forecast windfield ' + - 'for run startet at ' + - run_datetime.strftime('%Y%m%d%H')) - - # Create Hazard - haz = cls( - intensity=intensity, - event_id=event_id, - centroids = cls._centroids_from_nc(fp_file), - # fill in default values - orig=orig, - date=date, - event_name=[date_i + '_ens' + str(ens_i) - for date_i, ens_i - in zip(date_to_str(date), stacked['epsd_1'].values + 1)], - frequency=np.divide( - np.ones_like(event_id), - np.unique(ncdf['epsd_1']).size), - ) + with xr.open_dataset(fp_file) as ncdf: + ncdf = ncdf.assign_coords(date=('time',ncdf["time"].dt.floor("D").values)) + if event_date: + try: + stacked = ncdf.sel( + time=event_date.strftime('%Y-%m-%d') + ).groupby('date').max().stack(intensity=('y_1', 'x_1')) + except KeyError as ker: + raise ValueError('Extraction of date and coordinates failed. This is most likely ' + 'because the selected event_date ' + f'{event_date.strftime("%Y-%m-%d")} is not contained in the ' + 'weather forecast selected by fp_file {fp_file}. Please adjust ' + f'event_date or fp_file.') from ker + considered_dates = np.datetime64(event_date) + else: + time_covered_step = ncdf['time'].diff('time') + time_covered_day = time_covered_step.groupby('date').sum() + # forecast run should cover at least 18 hours of a day + considered_dates_bool = time_covered_day >= np.timedelta64(18,'h') + stacked = ncdf.groupby('date').max()\ + .sel(date=considered_dates_bool)\ + .stack(intensity=('y_1', 'x_1')) + considered_dates = stacked['date'].values + stacked = stacked.stack(date_ensemble=('date', 'epsd_1')) + stacked = stacked.where(stacked['VMAX_10M'] > intensity_thres) + stacked = stacked.fillna(0) + + # fill in values from netCDF + intensity = sparse.csr_matrix(stacked['VMAX_10M'].T) + event_id = np.arange(stacked['date_ensemble'].size) + 1 + date = np.repeat( + np.array(datetime64_to_ordinal(considered_dates)), + np.unique(ncdf['epsd_1']).size + ) + orig = np.full_like(event_id, False) + orig[(stacked['epsd_1'] == 0).values] = True + if description is None: + description = (model_name + + ' weather forecast windfield ' + + 'for run startet at ' + + run_datetime.strftime('%Y%m%d%H')) + + # Create Hazard + haz = cls( + intensity=intensity, + event_id=event_id, + centroids = cls._centroids_from_nc(fp_file), + # fill in default values + orig=orig, + date=date, + event_name=[date_i + '_ens' + str(ens_i) + for date_i, ens_i + in zip(date_to_str(date), stacked['epsd_1'].values + 1)], + frequency=np.divide( + np.ones_like(event_id), + np.unique(ncdf['epsd_1']).size), + ) - # close netcdf file - ncdf.close() - haz.check() - return haz + haz.check() + return haz def read_icon_grib(self, *args, **kwargs): """This function is deprecated, use StormEurope.from_icon_grib instead.""" @@ -444,11 +437,12 @@ def from_icon_grib(cls, run_datetime, event_date=None, model_name='icon-eu-eps', gripfile_path_i = Path(file_i[:-4]) with open(file_i, 'rb') as source, open(gripfile_path_i, 'wb') as dest: dest.write(bz2.decompress(source.read())) - ds_i = xr.open_dataset(gripfile_path_i, engine='cfgrib') - if ind_i == 0: - stacked = ds_i - else: - stacked = xr.concat([stacked,ds_i], 'valid_time') + + with xr.open_dataset(gripfile_path_i, engine='cfgrib') as ds_i: + if ind_i == 0: + stacked = ds_i + else: + stacked = xr.concat([stacked,ds_i], 'valid_time') # create intensity matrix with max for each full day stacked = stacked.assign_coords( @@ -524,35 +518,34 @@ def _centroids_from_nc(file_name): 'longitude' variables in a netCDF file. """ LOGGER.info('Constructing centroids from %s', file_name) - ncdf = xr.open_dataset(file_name) - create_meshgrid = True - if hasattr(ncdf, 'latitude'): - lats = ncdf['latitude'].data - lons = ncdf['longitude'].data - elif hasattr(ncdf, 'lat'): - lats = ncdf['lat'].data - lons = ncdf['lon'].data - elif hasattr(ncdf, 'lat_1'): - if len(ncdf['lon_1'].shape)>1 & \ - (ncdf['lon_1'].shape == ncdf['lat_1'].shape) \ - : - lats = ncdf['lat_1'].data.flatten() - lons = ncdf['lon_1'].data.flatten() + with xr.open_dataset(file_name) as ncdf: + create_meshgrid = True + if hasattr(ncdf, 'latitude'): + lats = ncdf['latitude'].data + lons = ncdf['longitude'].data + elif hasattr(ncdf, 'lat'): + lats = ncdf['lat'].data + lons = ncdf['lon'].data + elif hasattr(ncdf, 'lat_1'): + if len(ncdf['lon_1'].shape)>1 & \ + (ncdf['lon_1'].shape == ncdf['lat_1'].shape) \ + : + lats = ncdf['lat_1'].data.flatten() + lons = ncdf['lon_1'].data.flatten() + create_meshgrid = False + else: + lats = ncdf['lat_1'].data + lons = ncdf['lon_1'].data + elif hasattr(ncdf, 'clat'): + lats = ncdf['clat'].data + lons = ncdf['clon'].data + if ncdf['clat'].attrs['units']=='radian': + lats = np.rad2deg(lats) + lons = np.rad2deg(lons) create_meshgrid = False else: - lats = ncdf['lat_1'].data - lons = ncdf['lon_1'].data - elif hasattr(ncdf, 'clat'): - lats = ncdf['clat'].data - lons = ncdf['clon'].data - if ncdf['clat'].attrs['units']=='radian': - lats = np.rad2deg(lats) - lons = np.rad2deg(lons) - create_meshgrid = False - else: - raise AttributeError('netcdf file has no field named latitude or ' - 'other know abrivation for coordinates.') - ncdf.close() + raise AttributeError('netcdf file has no field named latitude or ' + 'other know abrivation for coordinates.') if create_meshgrid: lats, lons = np.array([np.repeat(lats, len(lons)), diff --git a/climada/hazard/tc_tracks.py b/climada/hazard/tc_tracks.py index fd8c333c6..519f93627 100644 --- a/climada/hazard/tc_tracks.py +++ b/climada/hazard/tc_tracks.py @@ -466,269 +466,269 @@ def from_ibtracs_netcdf(cls, provider=None, rescale_windspeeds=True, storm_id=No if additional_variables is None: additional_variables = [] - ibtracs_ds = xr.open_dataset(ibtracs_path) - ibtracs_date = ibtracs_ds.attrs["date_created"] - if (np.datetime64('today') - np.datetime64(ibtracs_date)).item().days > 180: - LOGGER.warning("The cached IBTrACS data set dates from %s (older " - "than 180 days). Very likely, a more recent version is available. " - "Consider manually removing the file %s and re-running " - "this function, which will download the most recent version of the " - "IBTrACS data set from the official URL.", ibtracs_date, ibtracs_path) - - match = np.ones(ibtracs_ds.sid.shape[0], dtype=bool) - if storm_id is not None: - if not isinstance(storm_id, list): - storm_id = [storm_id] - invalid_mask = np.array( - [re.match(r"[12][0-9]{6}[NS][0-9]{5}", s) is None for s in storm_id]) - if invalid_mask.any(): - invalid_sids = list(np.array(storm_id)[invalid_mask]) - raise ValueError("The following given IDs are invalid: %s%s" % ( - ", ".join(invalid_sids[:5]), - ", ..." if len(invalid_sids) > 5 else ".")) - storm_id = list(np.array(storm_id)[~invalid_mask]) - storm_id_encoded = [i.encode() for i in storm_id] - non_existing_mask = ~np.isin(storm_id_encoded, ibtracs_ds.sid.values) - if np.count_nonzero(non_existing_mask) > 0: - non_existing_sids = list(np.array(storm_id)[non_existing_mask]) - raise ValueError("The following given IDs are not in IBTrACS: %s%s" % ( - ", ".join(non_existing_sids[:5]), - ", ..." if len(non_existing_sids) > 5 else ".")) - storm_id_encoded = list(np.array(storm_id_encoded)[~non_existing_mask]) - match &= ibtracs_ds.sid.isin(storm_id_encoded) - if year_range is not None: - years = ibtracs_ds.sid.str.slice(0, 4).astype(int) - match &= (years >= year_range[0]) & (years <= year_range[1]) - if np.count_nonzero(match) == 0: - LOGGER.info('No tracks in time range (%s, %s).', *year_range) - if basin is not None: - match &= (ibtracs_ds.basin == basin.encode()).any(dim='date_time') - if np.count_nonzero(match) == 0: - LOGGER.info('No tracks in basin %s.', basin) - if genesis_basin is not None: - # Here, we only filter for the basin at *any* eye position. We will filter again later - # for the basin of the *first* eye position, but only after restricting to the valid - # time steps in the data. - match &= (ibtracs_ds.basin == genesis_basin.encode()).any(dim='date_time') - if np.count_nonzero(match) == 0: - LOGGER.info('No tracks in genesis basin %s.', genesis_basin) - - if np.count_nonzero(match) == 0: - LOGGER.info("IBTrACS doesn't contain any tracks matching the specified requirements.") - return cls() - - ibtracs_ds = ibtracs_ds.sel(storm=match) - ibtracs_ds['valid_t'] = ibtracs_ds['time'].notnull() - - if rescale_windspeeds: - for agency in IBTRACS_AGENCIES: - scale, shift = IBTRACS_AGENCY_1MIN_WIND_FACTOR[agency] - ibtracs_ds[f'{agency}_wind'] -= shift - ibtracs_ds[f'{agency}_wind'] /= scale + with xr.open_dataset(ibtracs_path) as ibtracs_ds: + ibtracs_date = ibtracs_ds.attrs["date_created"] + if (np.datetime64('today') - np.datetime64(ibtracs_date)).item().days > 180: + LOGGER.warning("The cached IBTrACS data set dates from %s (older " + "than 180 days). Very likely, a more recent version is available. " + "Consider manually removing the file %s and re-running " + "this function, which will download the most recent version of the " + "IBTrACS data set from the official URL.", ibtracs_date, ibtracs_path) + + match = np.ones(ibtracs_ds.sid.shape[0], dtype=bool) + if storm_id is not None: + if not isinstance(storm_id, list): + storm_id = [storm_id] + invalid_mask = np.array( + [re.match(r"[12][0-9]{6}[NS][0-9]{5}", s) is None for s in storm_id]) + if invalid_mask.any(): + invalid_sids = list(np.array(storm_id)[invalid_mask]) + raise ValueError("The following given IDs are invalid: %s%s" % ( + ", ".join(invalid_sids[:5]), + ", ..." if len(invalid_sids) > 5 else ".")) + storm_id = list(np.array(storm_id)[~invalid_mask]) + storm_id_encoded = [i.encode() for i in storm_id] + non_existing_mask = ~np.isin(storm_id_encoded, ibtracs_ds.sid.values) + if np.count_nonzero(non_existing_mask) > 0: + non_existing_sids = list(np.array(storm_id)[non_existing_mask]) + raise ValueError("The following given IDs are not in IBTrACS: %s%s" % ( + ", ".join(non_existing_sids[:5]), + ", ..." if len(non_existing_sids) > 5 else ".")) + storm_id_encoded = list(np.array(storm_id_encoded)[~non_existing_mask]) + match &= ibtracs_ds.sid.isin(storm_id_encoded) + if year_range is not None: + years = ibtracs_ds.sid.str.slice(0, 4).astype(int) + match &= (years >= year_range[0]) & (years <= year_range[1]) + if np.count_nonzero(match) == 0: + LOGGER.info('No tracks in time range (%s, %s).', *year_range) + if basin is not None: + match &= (ibtracs_ds.basin == basin.encode()).any(dim='date_time') + if np.count_nonzero(match) == 0: + LOGGER.info('No tracks in basin %s.', basin) + if genesis_basin is not None: + # Here, we only filter for the basin at *any* eye position. We will filter again later + # for the basin of the *first* eye position, but only after restricting to the valid + # time steps in the data. + match &= (ibtracs_ds.basin == genesis_basin.encode()).any(dim='date_time') + if np.count_nonzero(match) == 0: + LOGGER.info('No tracks in genesis basin %s.', genesis_basin) - if provider is None: - provider = ["official_3h"] + IBTRACS_AGENCIES - elif isinstance(provider, str): - provider = [provider] - - phys_vars = ['lat', 'lon', 'wind', 'pres', 'rmw', 'poci', 'roci'] - for tc_var in phys_vars: - if "official" in provider or "official_3h" in provider: - ibtracs_add_official_variable( - ibtracs_ds, tc_var, add_3h=("official_3h" in provider)) - - # set up dimension of agency-reported values in order of preference, including the - # newly created `official` and `official_3h` data if specified - ag_vars = [f'{ag}_{tc_var}' for ag in provider] - ag_vars = [ag_var for ag_var in ag_vars if ag_var in ibtracs_ds.data_vars.keys()] - if len(ag_vars) == 0: - ag_vars = [f'{provider[0]}_{tc_var}'] - ibtracs_ds[ag_vars[0]] = xr.full_like(ibtracs_ds[f'usa_{tc_var}'], np.nan) - all_vals = ibtracs_ds[ag_vars].to_array(dim='agency') - # argmax returns the first True (i.e. valid) along the 'agency' dimension - preferred_idx = all_vals.notnull().any(dim="date_time").argmax(dim='agency') - ibtracs_ds[tc_var] = all_vals.isel(agency=preferred_idx) - - selected_ags = np.array([v[:-len(f'_{tc_var}')].encode() for v in ag_vars]) - ibtracs_ds[f'{tc_var}_agency'] = ('storm', selected_ags[preferred_idx.values]) - - if tc_var == 'lon': - # Most IBTrACS longitudes are either normalized to [-180, 180] or to [0, 360], but - # some aren't normalized at all, so we have to make sure that the values are okay: - lons = ibtracs_ds[tc_var].values.copy() - lon_valid_mask = np.isfinite(lons) - lons[lon_valid_mask] = u_coord.lon_normalize(lons[lon_valid_mask], center=0.0) - ibtracs_ds[tc_var].values[:] = lons - - # Make sure that the longitude is always chosen positive if a track crosses the - # antimeridian: - crossing_mask = ((ibtracs_ds[tc_var] > 170).any(dim="date_time") - & (ibtracs_ds[tc_var] < -170).any(dim="date_time") - & (ibtracs_ds[tc_var] < 0)).values - ibtracs_ds[tc_var].values[crossing_mask] += 360 - - if interpolate_missing: - with warnings.catch_warnings(): - # Upstream issue, see https://github.com/pydata/xarray/issues/4167 - warnings.simplefilter(action="ignore", category=FutureWarning) + if np.count_nonzero(match) == 0: + LOGGER.info("IBTrACS doesn't contain any tracks matching the specified requirements.") + return cls() + + ibtracs_ds = ibtracs_ds.sel(storm=match) + ibtracs_ds['valid_t'] = ibtracs_ds['time'].notnull() + + if rescale_windspeeds: + for agency in IBTRACS_AGENCIES: + scale, shift = IBTRACS_AGENCY_1MIN_WIND_FACTOR[agency] + ibtracs_ds[f'{agency}_wind'] -= shift + ibtracs_ds[f'{agency}_wind'] /= scale + + if provider is None: + provider = ["official_3h"] + IBTRACS_AGENCIES + elif isinstance(provider, str): + provider = [provider] + + phys_vars = ['lat', 'lon', 'wind', 'pres', 'rmw', 'poci', 'roci'] + for tc_var in phys_vars: + if "official" in provider or "official_3h" in provider: + ibtracs_add_official_variable( + ibtracs_ds, tc_var, add_3h=("official_3h" in provider)) + + # set up dimension of agency-reported values in order of preference, including the + # newly created `official` and `official_3h` data if specified + ag_vars = [f'{ag}_{tc_var}' for ag in provider] + ag_vars = [ag_var for ag_var in ag_vars if ag_var in ibtracs_ds.data_vars.keys()] + if len(ag_vars) == 0: + ag_vars = [f'{provider[0]}_{tc_var}'] + ibtracs_ds[ag_vars[0]] = xr.full_like(ibtracs_ds[f'usa_{tc_var}'], np.nan) + all_vals = ibtracs_ds[ag_vars].to_array(dim='agency') + # argmax returns the first True (i.e. valid) along the 'agency' dimension + preferred_idx = all_vals.notnull().any(dim="date_time").argmax(dim='agency') + ibtracs_ds[tc_var] = all_vals.isel(agency=preferred_idx) + + selected_ags = np.array([v[:-len(f'_{tc_var}')].encode() for v in ag_vars]) + ibtracs_ds[f'{tc_var}_agency'] = ('storm', selected_ags[preferred_idx.values]) + + if tc_var == 'lon': + # Most IBTrACS longitudes are either normalized to [-180, 180] or to [0, 360], but + # some aren't normalized at all, so we have to make sure that the values are okay: + lons = ibtracs_ds[tc_var].values.copy() + lon_valid_mask = np.isfinite(lons) + lons[lon_valid_mask] = u_coord.lon_normalize(lons[lon_valid_mask], center=0.0) + ibtracs_ds[tc_var].values[:] = lons + + # Make sure that the longitude is always chosen positive if a track crosses the + # antimeridian: + crossing_mask = ((ibtracs_ds[tc_var] > 170).any(dim="date_time") + & (ibtracs_ds[tc_var] < -170).any(dim="date_time") + & (ibtracs_ds[tc_var] < 0)).values + ibtracs_ds[tc_var].values[crossing_mask] += 360 + + if interpolate_missing: + with warnings.catch_warnings(): + # Upstream issue, see https://github.com/pydata/xarray/issues/4167 + warnings.simplefilter(action="ignore", category=FutureWarning) + + # don't interpolate if there is only a single record for this variable + nonsingular_mask = ( + ibtracs_ds[tc_var].notnull().sum(dim="date_time") > 1).values + if nonsingular_mask.sum() > 0: + ibtracs_ds[tc_var].values[nonsingular_mask] = ( + ibtracs_ds[tc_var].sel(storm=nonsingular_mask).interpolate_na( + dim="date_time", method="linear")) + ibtracs_ds = ibtracs_ds[['sid', 'name', 'basin', 'time', 'valid_t'] + + additional_variables + phys_vars + + [f'{v}_agency' for v in phys_vars]] - # don't interpolate if there is only a single record for this variable - nonsingular_mask = ( - ibtracs_ds[tc_var].notnull().sum(dim="date_time") > 1).values - if nonsingular_mask.sum() > 0: - ibtracs_ds[tc_var].values[nonsingular_mask] = ( - ibtracs_ds[tc_var].sel(storm=nonsingular_mask).interpolate_na( - dim="date_time", method="linear")) - ibtracs_ds = ibtracs_ds[['sid', 'name', 'basin', 'time', 'valid_t'] - + additional_variables + phys_vars - + [f'{v}_agency' for v in phys_vars]] - - if estimate_missing: - ibtracs_ds['pres'][:] = _estimate_pressure( - ibtracs_ds['pres'], ibtracs_ds['lat'], ibtracs_ds['lon'], ibtracs_ds['wind']) - ibtracs_ds['wind'][:] = _estimate_vmax( - ibtracs_ds['wind'], ibtracs_ds['lat'], ibtracs_ds['lon'], ibtracs_ds['pres']) - - ibtracs_ds['valid_t'] &= (ibtracs_ds['lat'].notnull() & ibtracs_ds['lon'].notnull() - & ibtracs_ds['wind'].notnull() & ibtracs_ds['pres'].notnull()) - valid_storms_mask = ibtracs_ds['valid_t'].any(dim="date_time") - invalid_storms_idx = np.nonzero(~valid_storms_mask.data)[0] - if invalid_storms_idx.size > 0: - invalid_sids = list(ibtracs_ds.sid.sel(storm=invalid_storms_idx).astype(str).data) - LOGGER.warning('%d storm events are discarded because no valid wind/pressure values ' - 'have been found: %s%s', len(invalid_sids), ", ".join(invalid_sids[:5]), - ", ..." if len(invalid_sids) > 5 else ".") - ibtracs_ds = ibtracs_ds.sel(storm=valid_storms_mask) - - if discard_single_points: - valid_storms_mask = ibtracs_ds['valid_t'].sum(dim="date_time") > 1 + if estimate_missing: + ibtracs_ds['pres'][:] = _estimate_pressure( + ibtracs_ds['pres'], ibtracs_ds['lat'], ibtracs_ds['lon'], ibtracs_ds['wind']) + ibtracs_ds['wind'][:] = _estimate_vmax( + ibtracs_ds['wind'], ibtracs_ds['lat'], ibtracs_ds['lon'], ibtracs_ds['pres']) + + ibtracs_ds['valid_t'] &= (ibtracs_ds['lat'].notnull() & ibtracs_ds['lon'].notnull() + & ibtracs_ds['wind'].notnull() & ibtracs_ds['pres'].notnull()) + valid_storms_mask = ibtracs_ds['valid_t'].any(dim="date_time") invalid_storms_idx = np.nonzero(~valid_storms_mask.data)[0] if invalid_storms_idx.size > 0: invalid_sids = list(ibtracs_ds.sid.sel(storm=invalid_storms_idx).astype(str).data) - LOGGER.warning('%d storm events are discarded because only one valid timestep ' - 'has been found: %s%s', len(invalid_sids), - ", ".join(invalid_sids[:5]), + LOGGER.warning('%d storm events are discarded because no valid wind/pressure values ' + 'have been found: %s%s', len(invalid_sids), ", ".join(invalid_sids[:5]), ", ..." if len(invalid_sids) > 5 else ".") ibtracs_ds = ibtracs_ds.sel(storm=valid_storms_mask) - if ibtracs_ds.dims['storm'] == 0: - LOGGER.info('After discarding IBTrACS events without valid values by the selected ' - 'reporting agencies, there are no tracks left that match the specified ' - 'requirements.') - return cls() + if discard_single_points: + valid_storms_mask = ibtracs_ds['valid_t'].sum(dim="date_time") > 1 + invalid_storms_idx = np.nonzero(~valid_storms_mask.data)[0] + if invalid_storms_idx.size > 0: + invalid_sids = list(ibtracs_ds.sid.sel(storm=invalid_storms_idx).astype(str).data) + LOGGER.warning('%d storm events are discarded because only one valid timestep ' + 'has been found: %s%s', len(invalid_sids), + ", ".join(invalid_sids[:5]), + ", ..." if len(invalid_sids) > 5 else ".") + ibtracs_ds = ibtracs_ds.sel(storm=valid_storms_mask) + + if ibtracs_ds.dims['storm'] == 0: + LOGGER.info('After discarding IBTrACS events without valid values by the selected ' + 'reporting agencies, there are no tracks left that match the specified ' + 'requirements.') + return cls() + + max_wind = ibtracs_ds['wind'].max(dim="date_time").data.ravel() + category_test = (max_wind[:, None] < np.array(SAFFIR_SIM_CAT)[None]) + category = np.argmax(category_test, axis=1) - 1 + basin_map = {b.encode("utf-8"): v for b, v in BASIN_ENV_PRESSURE.items()} + basin_fun = lambda b: basin_map[b] - max_wind = ibtracs_ds['wind'].max(dim="date_time").data.ravel() - category_test = (max_wind[:, None] < np.array(SAFFIR_SIM_CAT)[None]) - category = np.argmax(category_test, axis=1) - 1 - basin_map = {b.encode("utf-8"): v for b, v in BASIN_ENV_PRESSURE.items()} - basin_fun = lambda b: basin_map[b] + ibtracs_ds['id_no'] = (ibtracs_ds.sid.str.replace(b'N', b'0') + .str.replace(b'S', b'1') + .astype(float)) - ibtracs_ds['id_no'] = (ibtracs_ds.sid.str.replace(b'N', b'0') - .str.replace(b'S', b'1') - .astype(float)) + last_perc = 0 + all_tracks = [] + for i_track, t_msk in enumerate(ibtracs_ds['valid_t'].data): + perc = 100 * len(all_tracks) / ibtracs_ds.sid.size + if perc - last_perc >= 10: + LOGGER.info("Progress: %d%%", perc) + last_perc = perc + track_ds = ibtracs_ds.sel(storm=i_track, date_time=t_msk) + tr_basin_penv = xr.apply_ufunc(basin_fun, track_ds.basin, vectorize=True) + tr_genesis_basin = track_ds.basin.values[0].astype(str).item() - last_perc = 0 - all_tracks = [] - for i_track, t_msk in enumerate(ibtracs_ds['valid_t'].data): - perc = 100 * len(all_tracks) / ibtracs_ds.sid.size - if perc - last_perc >= 10: - LOGGER.info("Progress: %d%%", perc) - last_perc = perc - track_ds = ibtracs_ds.sel(storm=i_track, date_time=t_msk) - tr_basin_penv = xr.apply_ufunc(basin_fun, track_ds.basin, vectorize=True) - tr_genesis_basin = track_ds.basin.values[0].astype(str).item() + # Now that the valid time steps have been selected, we discard this track if it + # doesn't fit the specified basin definitions: + if genesis_basin is not None and tr_genesis_basin != genesis_basin: + continue + if basin is not None and basin.encode() not in track_ds.basin.values: + continue - # Now that the valid time steps have been selected, we discard this track if it - # doesn't fit the specified basin definitions: - if genesis_basin is not None and tr_genesis_basin != genesis_basin: - continue - if basin is not None and basin.encode() not in track_ds.basin.values: - continue + # A track that crosses the antimeridian in IBTrACS might be truncated by `t_msk` in + # such a way that the remaining part is not crossing the antimeridian: + if (track_ds['lon'].values > 180).all(): + track_ds['lon'] -= 360 - # A track that crosses the antimeridian in IBTrACS might be truncated by `t_msk` in - # such a way that the remaining part is not crossing the antimeridian: - if (track_ds['lon'].values > 180).all(): - track_ds['lon'] -= 360 - - # set time_step in hours - track_ds['time_step'] = xr.ones_like(track_ds['time'], dtype=float) - if track_ds['time'].size > 1: - track_ds['time_step'].values[1:] = (track_ds['time'].diff(dim="date_time") - / np.timedelta64(1, 'h')) - track_ds['time_step'].values[0] = track_ds['time_step'][1] - - with warnings.catch_warnings(): - # See https://github.com/pydata/xarray/issues/4167 - warnings.simplefilter(action="ignore", category=FutureWarning) - - track_ds['rmw'] = track_ds['rmw'] \ - .ffill(dim='date_time', limit=1) \ - .bfill(dim='date_time', limit=1) \ - .fillna(0) - track_ds['roci'] = track_ds['roci'] \ - .ffill(dim='date_time', limit=1) \ - .bfill(dim='date_time', limit=1) \ - .fillna(0) - track_ds['poci'] = track_ds['poci'] \ - .ffill(dim='date_time', limit=4) \ - .bfill(dim='date_time', limit=4) - # this is the most time consuming line in the processing: - track_ds['poci'] = track_ds['poci'].fillna(tr_basin_penv) + # set time_step in hours + track_ds['time_step'] = xr.ones_like(track_ds['time'], dtype=float) + if track_ds['time'].size > 1: + track_ds['time_step'].values[1:] = (track_ds['time'].diff(dim="date_time") + / np.timedelta64(1, 'h')) + track_ds['time_step'].values[0] = track_ds['time_step'][1] - if estimate_missing: - track_ds['rmw'][:] = estimate_rmw(track_ds['rmw'].values, track_ds['pres'].values) - track_ds['roci'][:] = estimate_roci(track_ds['roci'].values, - track_ds['pres'].values) - track_ds['roci'][:] = np.fmax(track_ds['rmw'].values, track_ds['roci'].values) - - # ensure environmental pressure >= central pressure - # this is the second most time consuming line in the processing: - track_ds['poci'][:] = np.fmax(track_ds['poci'], track_ds['pres']) - - provider_str = f"ibtracs_{provider[0]}" - if len(provider) > 1: - provider_str = "ibtracs_mixed:" + ",".join( - "{}({})".format(v, track_ds[f'{v}_agency'].astype(str).item()) - for v in phys_vars) - - data_vars = { - 'radius_max_wind': ('time', track_ds['rmw'].data), - 'radius_oci': ('time', track_ds['roci'].data), - 'max_sustained_wind': ('time', track_ds['wind'].data), - 'central_pressure': ('time', track_ds['pres'].data), - 'environmental_pressure': ('time', track_ds['poci'].data), - } - coords = { - 'time': ('time', track_ds['time'].dt.round('s').data), - 'lat': ('time', track_ds['lat'].data), - 'lon': ('time', track_ds['lon'].data), - } - attrs = { - 'max_sustained_wind_unit': 'kn', - 'central_pressure_unit': 'mb', - 'orig_event_flag': True, - 'data_provider': provider_str, - 'category': category[i_track], - } - # automatically assign the remaining variables as attributes or data variables - for varname in ["time_step", "basin", "name", "sid", "id_no"] + additional_variables: - values = track_ds[varname].data - if track_ds[varname].dtype.kind == "S": - # This converts the `bytes` (dtype "|S*") in IBTrACS to the more common `str` - # objects (dtype "= central pressure + # this is the second most time consuming line in the processing: + track_ds['poci'][:] = np.fmax(track_ds['poci'], track_ds['pres']) + + provider_str = f"ibtracs_{provider[0]}" + if len(provider) > 1: + provider_str = "ibtracs_mixed:" + ",".join( + "{}({})".format(v, track_ds[f'{v}_agency'].astype(str).item()) + for v in phys_vars) + + data_vars = { + 'radius_max_wind': ('time', track_ds['rmw'].data), + 'radius_oci': ('time', track_ds['roci'].data), + 'max_sustained_wind': ('time', track_ds['wind'].data), + 'central_pressure': ('time', track_ds['pres'].data), + 'environmental_pressure': ('time', track_ds['poci'].data), + } + coords = { + 'time': ('time', track_ds['time'].dt.round('s').data), + 'lat': ('time', track_ds['lat'].data), + 'lon': ('time', track_ds['lon'].data), + } + attrs = { + 'max_sustained_wind_unit': 'kn', + 'central_pressure_unit': 'mb', + 'orig_event_flag': True, + 'data_provider': provider_str, + 'category': category[i_track], + } + # automatically assign the remaining variables as attributes or data variables + for varname in ["time_step", "basin", "name", "sid", "id_no"] + additional_variables: + values = track_ds[varname].data + if track_ds[varname].dtype.kind == "S": + # This converts the `bytes` (dtype "|S*") in IBTrACS to the more common `str` + # objects (dtype "= year_range[0]) - & (chaz_ds['time'].dt.year <= year_range[1])).sel(lifelength=0) - if np.count_nonzero(match) == 0: - LOGGER.info('No tracks in time range (%s, %s).', *year_range) - continue - chaz_ds = chaz_ds.sel(id=match) - - # filter by ensembleNum if given - if ensemble_nums is not None: - match = np.isin(chaz_ds['ensembleNum'].values, ensemble_nums) - if np.count_nonzero(match) == 0: - LOGGER.info('No tracks with specified ensemble numbers.') - continue - chaz_ds = chaz_ds.sel(id=match) - - # remove invalid tracks from selection - chaz_ds['valid_t'] = chaz_ds['time'].notnull() & chaz_ds['Mwspd'].notnull() - valid_st = chaz_ds['valid_t'].any(dim="lifelength") - invalid_st = np.nonzero(~valid_st.data)[0] - if invalid_st.size > 0: - LOGGER.info('No valid Mwspd values found for %d out of %d storm tracks.', - invalid_st.size, valid_st.size) - chaz_ds = chaz_ds.sel(id=valid_st) - - # estimate central pressure from location and max wind - chaz_ds['pres'] = xr.full_like(chaz_ds['Mwspd'], -1, dtype=float) - chaz_ds['pres'][:] = _estimate_pressure( - chaz_ds['pres'], chaz_ds['latitude'], chaz_ds['longitude'], chaz_ds['Mwspd']) - - # compute time stepsizes - chaz_ds['time_step'] = xr.zeros_like(chaz_ds['time'], dtype=float) - chaz_ds['time_step'][1:, :] = (chaz_ds['time'].diff(dim="lifelength") - / np.timedelta64(1, 'h')) - chaz_ds['time_step'][0, :] = chaz_ds['time_step'][1, :] - - # determine Saffir-Simpson category - max_wind = chaz_ds['Mwspd'].max(dim="lifelength").data.ravel() - category_test = (max_wind[:, None] < np.array(SAFFIR_SIM_CAT)[None]) - chaz_ds['category'] = ("id", np.argmax(category_test, axis=1) - 1) - - fname = Path(path).name - chaz_ds['time'][:] = chaz_ds['time'].dt.round('s').data - chaz_ds['radius_max_wind'] = xr.full_like(chaz_ds['pres'], np.nan) - chaz_ds['environmental_pressure'] = xr.full_like(chaz_ds['pres'], DEF_ENV_PRESSURE) - chaz_ds["track_name"] = ("id", [f"{fname}-{track_id.item()[1]}-{track_id.item()[0]}" - for track_id in chaz_ds['id']]) - - # add tracks one by one - last_perc = 0 - for cnt, i_track in enumerate(chaz_ds['id_no']): - perc = 100 * cnt / chaz_ds['id_no'].size - if perc - last_perc >= 10: - LOGGER.info("Progress: %d%%", perc) - last_perc = perc - track_ds = chaz_ds.sel(id=i_track['id'].item()) - track_ds = track_ds.sel(lifelength=track_ds['valid_t'].data) - data.append(xr.Dataset({ - 'time_step': ('time', track_ds['time_step'].values), - 'max_sustained_wind': ('time', track_ds['Mwspd'].values), - 'central_pressure': ('time', track_ds['pres'].values), - 'radius_max_wind': ('time', track_ds['radius_max_wind'].values), - 'environmental_pressure': ('time', track_ds['environmental_pressure'].values), - 'basin': ('time', np.full(track_ds['time'].size, "GB", dtype="= year_range[0]) + & (chaz_ds['time'].dt.year <= year_range[1])).sel(lifelength=0) + if np.count_nonzero(match) == 0: + LOGGER.info('No tracks in time range (%s, %s).', *year_range) + continue + chaz_ds = chaz_ds.sel(id=match) + + # filter by ensembleNum if given + if ensemble_nums is not None: + match = np.isin(chaz_ds['ensembleNum'].values, ensemble_nums) + if np.count_nonzero(match) == 0: + LOGGER.info('No tracks with specified ensemble numbers.') + continue + chaz_ds = chaz_ds.sel(id=match) + + # remove invalid tracks from selection + chaz_ds['valid_t'] = chaz_ds['time'].notnull() & chaz_ds['Mwspd'].notnull() + valid_st = chaz_ds['valid_t'].any(dim="lifelength") + invalid_st = np.nonzero(~valid_st.data)[0] + if invalid_st.size > 0: + LOGGER.info('No valid Mwspd values found for %d out of %d storm tracks.', + invalid_st.size, valid_st.size) + chaz_ds = chaz_ds.sel(id=valid_st) + + # estimate central pressure from location and max wind + chaz_ds['pres'] = xr.full_like(chaz_ds['Mwspd'], -1, dtype=float) + chaz_ds['pres'][:] = _estimate_pressure( + chaz_ds['pres'], chaz_ds['latitude'], chaz_ds['longitude'], chaz_ds['Mwspd']) + + # compute time stepsizes + chaz_ds['time_step'] = xr.zeros_like(chaz_ds['time'], dtype=float) + chaz_ds['time_step'][1:, :] = (chaz_ds['time'].diff(dim="lifelength") + / np.timedelta64(1, 'h')) + chaz_ds['time_step'][0, :] = chaz_ds['time_step'][1, :] + + # determine Saffir-Simpson category + max_wind = chaz_ds['Mwspd'].max(dim="lifelength").data.ravel() + category_test = (max_wind[:, None] < np.array(SAFFIR_SIM_CAT)[None]) + chaz_ds['category'] = ("id", np.argmax(category_test, axis=1) - 1) + + fname = Path(path).name + chaz_ds['time'][:] = chaz_ds['time'].dt.round('s').data + chaz_ds['radius_max_wind'] = xr.full_like(chaz_ds['pres'], np.nan) + chaz_ds['environmental_pressure'] = xr.full_like(chaz_ds['pres'], DEF_ENV_PRESSURE) + chaz_ds["track_name"] = ("id", [f"{fname}-{track_id.item()[1]}-{track_id.item()[0]}" + for track_id in chaz_ds['id']]) + + # add tracks one by one + last_perc = 0 + for cnt, i_track in enumerate(chaz_ds['id_no']): + perc = 100 * cnt / chaz_ds['id_no'].size + if perc - last_perc >= 10: + LOGGER.info("Progress: %d%%", perc) + last_perc = perc + track_ds = chaz_ds.sel(id=i_track['id'].item()) + track_ds = track_ds.sel(lifelength=track_ds['valid_t'].data) + data.append(xr.Dataset({ + 'time_step': ('time', track_ds['time_step'].values), + 'max_sustained_wind': ('time', track_ds['Mwspd'].values), + 'central_pressure': ('time', track_ds['pres'].values), + 'radius_max_wind': ('time', track_ds['radius_max_wind'].values), + 'environmental_pressure': ('time', track_ds['environmental_pressure'].values), + 'basin': ('time', np.full(track_ds['time'].size, "GB", dtype=" 0: - # The legacy format only has data in the subgroups, not in the root. - # => This cannot be the legacy file format! - return - # After this line, it is sure that the format is not supported (since there is no data in the - # root group). Before raising an exception, we double-check if it is the legacy format. - try: - # Check if the file has groups 'track{i}' by trying to access the first group. - with xr.open_dataset(file_name, group="track0") as ds_track: - # Check if the group actually contains track data: - is_legacy = ( - "time" in ds_track.dims - and "max_sustained_wind" in ds_track.variables - ) - except OSError as err: - if "group not found" in str(err): - # No 'track0' group. => This cannot be the legacy file format! - is_legacy = False - else: - # A different (unexpected) error occurred. => Re-raise the exception. - raise + with xr.open_dataset(file_name) as test_ds: + if len(test_ds.dims) > 0: + # The legacy format only has data in the subgroups, not in the root. + # => This cannot be the legacy file format! + return + # After this line, it is sure that the format is not supported (since there is no data in the + # root group). Before raising an exception, we double-check if it is the legacy format. + try: + # Check if the file has groups 'track{i}' by trying to access the first group. + with xr.open_dataset(file_name, group="track0") as ds_track: + # Check if the group actually contains track data: + is_legacy = ( + "time" in ds_track.dims + and "max_sustained_wind" in ds_track.variables + ) + except OSError as err: + if "group not found" in str(err): + # No 'track0' group. => This cannot be the legacy file format! + is_legacy = False + else: + # A different (unexpected) error occurred. => Re-raise the exception. + raise raise ValueError( ( @@ -2196,81 +2195,80 @@ def ibtracs_fit_param(explained, explanatory, year_range=(1980, 2019), order=1): # load ibtracs dataset fn_nc = SYSTEM_DIR.joinpath('IBTrACS.ALL.v04r00.nc') - ibtracs_ds = xr.open_dataset(fn_nc) - - # choose specified year range - years = ibtracs_ds.sid.str.slice(0, 4).astype(int) - match = (years >= year_range[0]) & (years <= year_range[1]) - ibtracs_ds = ibtracs_ds.sel(storm=match) - - if "wind" in variables: - for agency in IBTRACS_AGENCIES: - scale, shift = IBTRACS_AGENCY_1MIN_WIND_FACTOR[agency] - ibtracs_ds[f'{agency}_wind'] -= shift - ibtracs_ds[f'{agency}_wind'] /= scale - - # fill values - agency_pref, track_agency_ix = ibtracs_track_agency(ibtracs_ds) - for var in wmo_vars: - if var not in variables: - continue - # array of values in order of preference - cols = [f'{a}_{var}' for a in agency_pref] - cols = [col for col in cols if col in ibtracs_ds.data_vars.keys()] - all_vals = ibtracs_ds[cols].to_array(dim='agency') - preferred_ix = all_vals.notnull().argmax(dim='agency') - if var in ['wind', 'pres']: - # choice: wmo -> wmo_agency/usa_agency -> preferred - ibtracs_ds[var] = ibtracs_ds['wmo_' + var] \ - .fillna(all_vals.isel(agency=track_agency_ix)) \ - .fillna(all_vals.isel(agency=preferred_ix)) - else: - ibtracs_ds[var] = all_vals.isel(agency=preferred_ix) - fit_df = pd.DataFrame({var: ibtracs_ds[var].values.ravel() for var in variables}) - fit_df = fit_df.dropna(axis=0, how='any').reset_index(drop=True) - if 'lat' in explanatory: - fit_df['lat'] = fit_df['lat'].abs() - - # prepare explanatory variables - d_explanatory = fit_df[explanatory] - if isinstance(order, int): - order = (order,) * len(explanatory) - add_const = False - for ex, max_o in zip(explanatory, order): - if isinstance(max_o, tuple): - if fit_df[ex].min() > max_o[0]: - print(f"Minimum data value is {fit_df[ex].min()} > {max_o[0]}.") - if fit_df[ex].max() < max_o[-1]: - print(f"Maximum data value is {fit_df[ex].max()} < {max_o[-1]}.") - # piecewise linear with given break points - d_explanatory = d_explanatory.drop(labels=[ex], axis=1) - for i, max_o_i in enumerate(max_o): - col = f'{ex}{max_o_i}' - slope_0 = 1. / (max_o_i - max_o[i - 1]) if i > 0 else 0 - slope_1 = 1. / (max_o[i + 1] - max_o_i) if i + 1 < len(max_o) else 0 - d_explanatory[col] = np.fmax(0, (1 - slope_0 * np.fmax(0, max_o_i - fit_df[ex]) - - slope_1 * np.fmax(0, fit_df[ex] - max_o_i))) - elif max_o < 0: - d_explanatory = d_explanatory.drop(labels=[ex], axis=1) - for order in range(1, abs(max_o) + 1): - d_explanatory[f'{ex}^{-order}'] = fit_df[ex]**(-order) - add_const = True - else: - for order in range(2, max_o + 1): - d_explanatory[f'{ex}^{order}'] = fit_df[ex]**order - add_const = True - d_explained = fit_df[[explained]] - if add_const: - d_explanatory['const'] = 1.0 + with xr.open_dataset(fn_nc) as ibtracs_ds: + # choose specified year range + years = ibtracs_ds.sid.str.slice(0, 4).astype(int) + match = (years >= year_range[0]) & (years <= year_range[1]) + ibtracs_ds = ibtracs_ds.sel(storm=match) + + if "wind" in variables: + for agency in IBTRACS_AGENCIES: + scale, shift = IBTRACS_AGENCY_1MIN_WIND_FACTOR[agency] + ibtracs_ds[f'{agency}_wind'] -= shift + ibtracs_ds[f'{agency}_wind'] /= scale + + # fill values + agency_pref, track_agency_ix = ibtracs_track_agency(ibtracs_ds) + for var in wmo_vars: + if var not in variables: + continue + # array of values in order of preference + cols = [f'{a}_{var}' for a in agency_pref] + cols = [col for col in cols if col in ibtracs_ds.data_vars.keys()] + all_vals = ibtracs_ds[cols].to_array(dim='agency') + preferred_ix = all_vals.notnull().argmax(dim='agency') + if var in ['wind', 'pres']: + # choice: wmo -> wmo_agency/usa_agency -> preferred + ibtracs_ds[var] = ibtracs_ds['wmo_' + var] \ + .fillna(all_vals.isel(agency=track_agency_ix)) \ + .fillna(all_vals.isel(agency=preferred_ix)) + else: + ibtracs_ds[var] = all_vals.isel(agency=preferred_ix) + fit_df = pd.DataFrame({var: ibtracs_ds[var].values.ravel() for var in variables}) + fit_df = fit_df.dropna(axis=0, how='any').reset_index(drop=True) + if 'lat' in explanatory: + fit_df['lat'] = fit_df['lat'].abs() + + # prepare explanatory variables + d_explanatory = fit_df[explanatory] + if isinstance(order, int): + order = (order,) * len(explanatory) + add_const = False + for ex, max_o in zip(explanatory, order): + if isinstance(max_o, tuple): + if fit_df[ex].min() > max_o[0]: + print(f"Minimum data value is {fit_df[ex].min()} > {max_o[0]}.") + if fit_df[ex].max() < max_o[-1]: + print(f"Maximum data value is {fit_df[ex].max()} < {max_o[-1]}.") + # piecewise linear with given break points + d_explanatory = d_explanatory.drop(labels=[ex], axis=1) + for i, max_o_i in enumerate(max_o): + col = f'{ex}{max_o_i}' + slope_0 = 1. / (max_o_i - max_o[i - 1]) if i > 0 else 0 + slope_1 = 1. / (max_o[i + 1] - max_o_i) if i + 1 < len(max_o) else 0 + d_explanatory[col] = np.fmax(0, (1 - slope_0 * np.fmax(0, max_o_i - fit_df[ex]) + - slope_1 * np.fmax(0, fit_df[ex] - max_o_i))) + elif max_o < 0: + d_explanatory = d_explanatory.drop(labels=[ex], axis=1) + for order in range(1, abs(max_o) + 1): + d_explanatory[f'{ex}^{-order}'] = fit_df[ex]**(-order) + add_const = True + else: + for order in range(2, max_o + 1): + d_explanatory[f'{ex}^{order}'] = fit_df[ex]**order + add_const = True + d_explained = fit_df[[explained]] + if add_const: + d_explanatory['const'] = 1.0 - # run statistical fit - sm_results = sm.OLS(d_explained, d_explanatory).fit() + # run statistical fit + sm_results = sm.OLS(d_explained, d_explanatory).fit() - # print results - print(sm_results.params) - print("r^2:", sm_results.rsquared) + # print results + print(sm_results.params) + print("r^2:", sm_results.rsquared) - return sm_results + return sm_results def ibtracs_track_agency(ds_sel): """Get preferred IBTrACS agency for each entry in the dataset. diff --git a/doc/guide/Guide_Py_Performance.ipynb b/doc/guide/Guide_Py_Performance.ipynb index db700f168..bb3cf209f 100644 --- a/doc/guide/Guide_Py_Performance.ipynb +++ b/doc/guide/Guide_Py_Performance.ipynb @@ -1091,12 +1091,16 @@ "\n", "💡 **`xarray` allows to read the shape and type of variables contained in the dataset without loading any of the actual data into memory.**\n", "\n", - "Furthermore, when loading slices and arithmetically aggregating variables, memory is allocated not more than necessary, but values are obtained on-the-fly from the file." + "Furthermore, when loading slices and arithmetically aggregating variables, memory is allocated not more than necessary, but values are obtained on-the-fly from the file.\n", + "\n", + "⚠️ Note that opening a dataset should be done with a context manager, to ensure proper closing of the file:\n", + "\n", + "`with xr.open_dataset(\"saved_on_disk.nc\") as ds:`" ] }, { "cell_type": "markdown", - "id": "0ca35e9e", + "id": "d516b06f", "metadata": { "slideshow": { "slide_type": "slide"