From 5e75a52d13d094c5977762345106c3c96ceb1e7e Mon Sep 17 00:00:00 2001 From: Lifeng Ren Date: Thu, 8 Aug 2024 15:14:31 +0200 Subject: [PATCH] remove btc functions out of seals_core --- seals/seals_process_coarse_timeseries.py | 152 ----------------------- 1 file changed, 152 deletions(-) diff --git a/seals/seals_process_coarse_timeseries.py b/seals/seals_process_coarse_timeseries.py index 788f5c2..647dd6e 100644 --- a/seals/seals_process_coarse_timeseries.py +++ b/seals/seals_process_coarse_timeseries.py @@ -226,158 +226,6 @@ def lulc_as_coarse_states(p): hb.load_geotiff_chunk_by_bb(v, p.bb, output_path=hb.suri(v, 'aoi')) -def check_netcdf_file(nc_path, var_name): - """Check if the NetCDF file exists and contains the specified variable.""" - if not os.path.exists(nc_path): - print(f"File does not exist: {nc_path}") - return False - try: - with nc.Dataset(nc_path, 'r') as ds: - if var_name in ds.variables: - print(f"Variable '{var_name}' is present in the file.") - return True - else: - print(f"Variable '{var_name}' is not found. Available variables: {list(ds.variables.keys())}") - return False - except Exception as e: - print(f"Failed to read NetCDF file: {e}") - return False - -def interpolate_years(lower_data, upper_data, target_year, lower_year, upper_year): - """Interpolates data between two bands linearly based on the target year.""" - factor = (target_year - lower_year) / (upper_year - lower_year) - interpolated_data = lower_data + (upper_data - lower_data) * factor - return interpolated_data - -def load_correspondence_dict(correspondence_path): - """Load the correspondence dictionary from a CSV file.""" - df = pd.read_csv(correspondence_path) - correspondence_dict = {row['src_id']: row['src_label'] for index, row in df.iterrows()} - return correspondence_dict - -def extract_btc_netcdf(src_nc_path, output_dir, filter_dict, correspondence_dict): - """Extract specific data from a NetCDF file based on a filtering dictionary and save as GeoTIFFs.""" - var_name = 'LC_area_share' - if not check_netcdf_file(src_nc_path, var_name): - return - - full_src_nc_path = f'NETCDF:"{src_nc_path}":{var_name}' - ds = gdal.Open(full_src_nc_path) - if ds is None: - print(f"Failed to open source NetCDF file: {src_nc_path}") - return - - available_years = [2010 + i * 10 for i in range((2100 - 2010) // 10 + 1)] - acceptable_years = [int(year) for year in filter_dict['time']] - print(f"Years to filter by: {acceptable_years}") - - for target_year in acceptable_years: - time_dir = os.path.join(output_dir, f"time_{target_year}") - os.makedirs(time_dir, exist_ok=True) - - for lc_class in range(1, ds.RasterCount // len(available_years) + 1): - if target_year in available_years: - band_index = (available_years.index(target_year) * (ds.RasterCount // len(available_years))) + lc_class - band = ds.GetRasterBand(band_index) - data_array = band.ReadAsArray() - data_type = band.DataType - ndv = band.GetNoDataValue() - else: - lower_year = max([year for year in available_years if year < target_year], default=2010) - upper_year = min([year for year in available_years if year > target_year], default=2100) - lower_band_index = (available_years.index(lower_year) * (ds.RasterCount // len(available_years))) + lc_class - upper_band_index = (available_years.index(upper_year) * (ds.RasterCount // len(available_years))) + lc_class - lower_band = ds.GetRasterBand(lower_band_index) - upper_band = ds.GetRasterBand(upper_band_index) - lower_data = lower_band.ReadAsArray() - upper_data = upper_band.ReadAsArray() - data_array = interpolate_years(lower_data, upper_data, target_year, lower_year, upper_year) - data_type = lower_band.DataType # Assuming both bands have the same data type - ndv = lower_band.GetNoDataValue() # Assuming both bands have the same NoDataValue - print(f"Interpolated data for year {target_year}, class {lc_class}.") - - src_label = correspondence_dict.get(lc_class, f"lc_class_{lc_class}") - band_name = f"{src_label}.tif" - dst_file_path = os.path.join(time_dir, band_name) - - driver = gdal.GetDriverByName('GTiff') - out_ds = driver.Create(dst_file_path, ds.RasterXSize, ds.RasterYSize, 1, data_type) - if out_ds is None: - print(f"Failed to create output file: {dst_file_path}") - continue - - out_ds.GetRasterBand(1).WriteArray(data_array) - out_ds.GetRasterBand(1).SetNoDataValue(ndv) - out_ds.SetGeoTransform(ds.GetGeoTransform()) - out_ds.SetProjection('EPSG:4326') # Set the projection to WGS84 - - out_ds = None - print(f"Saved: {dst_file_path}") - -def coarse_extraction_btc(p): - """Create an empty folder dir to hold all coarse intermediate outputs, such as per-year changes in LU hectarage.""" - if p.run_this: - p.coarse_correspondence_dict = load_correspondence_dict(p.coarse_correspondence_path) - for index, row in list(p.scenarios_df.iterrows()): - seals_utils.assign_df_row_to_object_attributes(p, row) - hb.log('Extracting coarse states for scenario ' + str(index) + ' of ' + str(len(p.scenarios_df)) + ' with row ' + str([i for i in row])) - hb.debug('Analyzing row:\n' + str(row)) - - if p.scenario_type == 'baseline': - if hb.path_exists(os.path.join(p.input_dir, p.coarse_projections_input_path)): - src_nc_path = os.path.join(p.input_dir, p.coarse_projections_input_path) - elif hb.path_exists(os.path.join(p.base_data_dir, p.coarse_projections_input_path)): - src_nc_path = os.path.join(p.base_data_dir, p.coarse_projections_input_path) - else: - hb.log('Could not find ' + str(p.coarse_projections_input_path) + ' in either ' + str(p.input_dir) + ' or ' + str(p.base_data_dir)) - - dst_dir = os.path.join(p.cur_dir, p.exogenous_label, p.model_label) - - filter_dict = {'time': p.years} - # print(p.years) - # print(p.coarse_correspondence_dict) - - if not hb.path_exists(dst_dir): - extract_btc_netcdf(src_nc_path, dst_dir, filter_dict, p.coarse_correspondence_dict) - for year in p.years: - out_dst_dir = os.path.join(dst_dir, 'time_' + str(year)) - hb.create_directories(out_dst_dir) - - for lc_class_name in p.coarse_correspondence_dict.values(): - out_dst_lc_path = os.path.join(out_dst_dir, f"{lc_class_name}.tif") - hb.make_path_global_pyramid(out_dst_lc_path) - else: - if p.coarse_projections_input_path: - if hb.path_exists(os.path.join(p.input_dir, p.coarse_projections_input_path)): - src_nc_path = os.path.join(p.input_dir, p.coarse_projections_input_path) - elif hb.path_exists(os.path.join(p.base_data_dir, p.coarse_projections_input_path)): - src_nc_path = os.path.join(p.base_data_dir, p.coarse_projections_input_path) - else: - hb.log('No understandable input source.') - else: - hb.log('No coarse change listed') - - dst_dir = os.path.join(p.cur_dir, p.exogenous_label, p.climate_label, p.model_label, p.counterfactual_label) - filter_dict = {'time': p.years} - - # print(p.years) - # print(p.coarse_correspondence_dict) - - if not hb.path_exists(dst_dir): - extract_btc_netcdf(src_nc_path, dst_dir, filter_dict, p.coarse_correspondence_dict) - for year in p.years: - out_dst_dir = os.path.join(dst_dir, 'time_' + str(year)) - hb.create_directories(out_dst_dir) - - for lc_class_name in p.coarse_correspondence_dict.values(): - out_dst_lc_path = os.path.join(out_dst_dir, f"{lc_class_name}.tif") - hb.make_path_global_pyramid(out_dst_lc_path) - - p.coarse_extraction_dir = p.coarse_extraction_btc_dir - - - - def coarse_extraction(p): doc = """Create a empty folder dir. This will hold all of the coarse intermediate outputs, such as per-year changes in lu hectarage. Naming convention matches source. After reclassification this will be in destination conventions. """