Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mutually mask SDR inputs to avoid problems routing #1435

Merged
merged 10 commits into from
Nov 17, 2023
6 changes: 6 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ Unreleased Changes
condition when run with ``n_workers > 0``.
https://github.com/natcap/invest/issues/1426
* SDR
* Fixed an issue with SDR's sediment deposition where large regions would
become nodata in cases where the DEM has valid data but other inputs
(LULC, erosivity, erodibility) did not have valid pixels. Now, all
raster inputs are mutually masked so that only those pixel stacks
continue through to the model where all pixels in the stack are
non-nodata. (`#911 <https://github.com/natcap/invest/issues/911>`_)
* RKLS, USLE, avoided erosion, and avoided export rasters will now have
nodata in streams (`#1415 <https://github.com/natcap/invest/issues/1415>`_)
* Wind Energy
Expand Down
125 changes: 106 additions & 19 deletions src/natcap/invest/sdr/sdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,62 @@
},
"aligned_lulc.tif": {
"about": gettext(
"Copy of the input drainage map, clipped to "
"Copy of the input Land Use Land Cover map, clipped to "
"the extent of the other raster inputs and "
"aligned to the DEM."),
"bands": {1: {"type": "integer"}},
},
"mask.tif": {
"about": gettext(
"A raster aligned to the DEM and clipped to the "
"extent of the other raster inputs. Pixel values "
"indicate where a nodata value exists in the stack "
"of aligned rasters (pixel value of 0), or if all "
"values in the stack of rasters at this pixel "
"location are valid."),
"bands": {1: {"type": "integer"}},
},
"masked_dem.tif": {
"about": gettext(
"A copy of the aligned DEM, masked using the mask raster."
),
"bands": {1: {
"type": "number",
"units": u.meter}},
},
"masked_drainage.tif": {
"about": gettext(
"A copy of the aligned drainage map, masked using the "
"mask raster."
),
"bands": {1: {"type": "integer"}}
},
"masked_erodibility.tif": {
"about": gettext(
"A copy of the aligned erodibility map, masked using "
"the mask raster."
),
"bands": {1: {
"type": "number",
"units": u.metric_ton*u.hectare*u.hour/(u.hectare*u.megajoule*u.millimeter)
}},
},
"masked_erosivity.tif": {
"about": gettext(
"A copy of the aligned erosivity map, masked using "
"the mask raster."),
"bands": {1: {
"type": "number",
"units": u.megajoule*u.millimeter/(u.hectare*u.hour*u.year)
}},
},
"masked_lulc.tif": {
"about": gettext(
"A copy of the aligned Land Use Land Cover map, "
"masked using the mask raster."),
"bands": {1: {"type": "integer"}},
}
}
},
},
"taskgraph_cache": spec_utils.TASKGRAPH_DIR
}
Expand All @@ -421,6 +471,12 @@
'aligned_erodibility_path': 'aligned_erodibility.tif',
'aligned_erosivity_path': 'aligned_erosivity.tif',
'aligned_lulc_path': 'aligned_lulc.tif',
'mask_path': 'mask.tif',
'masked_dem_path': 'masked_dem.tif',
'masked_drainage_path': 'masked_drainage.tif',
'masked_erodibility_path': 'masked_erodibility.tif',
'masked_erosivity_path': 'masked_erosivity.tif',
'masked_lulc_path': 'masked_lulc.tif',
'cp_factor_path': 'cp.tif',
'd_dn_path': 'd_dn.tif',
'd_up_path': 'd_up.tif',
Expand Down Expand Up @@ -529,17 +585,22 @@ def execute(args):

base_list = []
aligned_list = []
for file_key in ['dem', 'lulc', 'erosivity', 'erodibility']:
base_list.append(args[file_key + "_path"])
aligned_list.append(f_reg["aligned_" + file_key + "_path"])
# all continuous rasters can use bilinaer, but lulc should be mode
masked_list = []
input_raster_key_list = ['dem', 'lulc', 'erosivity', 'erodibility']
for file_key in input_raster_key_list:
base_list.append(args[f"{file_key}_path"])
aligned_list.append(f_reg[f"aligned_{file_key}_path"])
masked_list.append(f_reg[f"masked_{file_key}_path"])
# all continuous rasters can use bilinear, but lulc should be mode
interpolation_list = ['bilinear', 'mode', 'bilinear', 'bilinear']

drainage_present = False
if 'drainage_path' in args and args['drainage_path'] != '':
drainage_present = True
input_raster_key_list.append('drainage')
base_list.append(args['drainage_path'])
aligned_list.append(f_reg['aligned_drainage_path'])
masked_list.append(f_reg['masked_drainage_path'])
interpolation_list.append('near')

dem_raster_info = pygeoprocessing.get_raster_info(args['dem_path'])
Expand All @@ -562,13 +623,39 @@ def execute(args):
target_path_list=aligned_list,
task_name='align input rasters')

mutual_mask_task = task_graph.add_task(
func=pygeoprocessing.raster_map,
kwargs={
'op': lambda *x: 1, # any valid pixel gets a value of 1
'rasters': aligned_list,
'target_path': f_reg['mask_path'],
'target_nodata': 0,
},
target_path_list=[f_reg['mask_path']],
dependent_task_list=[align_task],
task_name='create mask')

mask_tasks = {} # use a dict so we can put these in a loop
for key, aligned_path, masked_path in zip(input_raster_key_list,
aligned_list, masked_list):
mask_tasks[f"masked_{key}"] = task_graph.add_task(
func=pygeoprocessing.raster_map,
kwargs={
'op': lambda array, mask: array,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't used raster_map much yet, but this is cool to see!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely! Masking is such an easy thing to express within the context of raster_map, it was just too good to pass up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, using a lambda like this will break when n_workers > 0. I just discovered this issue while working on #1437. Because lambdas can't be pickled, they can't be directly passed into a taskgraph Task. op has to be a named function defined at the top level of the module.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, of course! Thanks for the reminder. Patched.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phargogh should this be _mask_op now in kwargs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yes, I missed one lambda and patched it in f33cf70

'rasters': [aligned_path, f_reg['mask_path']],
'target_path': masked_path,
},
target_path_list=[masked_path],
dependent_task_list=[mutual_mask_task, align_task],
task_name=f'mask {key}')

pit_fill_task = task_graph.add_task(
func=pygeoprocessing.routing.fill_pits,
args=(
(f_reg['aligned_dem_path'], 1),
(f_reg['masked_dem_path'], 1),
f_reg['pit_filled_dem_path']),
target_path_list=[f_reg['pit_filled_dem_path']],
dependent_task_list=[align_task],
dependent_task_list=[mask_tasks['masked_dem']],
task_name='fill pits')

slope_task = task_graph.add_task(
Expand Down Expand Up @@ -633,10 +720,10 @@ def execute(args):
drainage_task = task_graph.add_task(
func=_add_drainage(
f_reg['stream_path'],
f_reg['aligned_drainage_path'],
f_reg['masked_drainage_path'],
f_reg['stream_and_drainage_path']),
target_path_list=[f_reg['stream_and_drainage_path']],
dependent_task_list=[stream_task, align_task],
dependent_task_list=[stream_task, mask_tasks['masked_drainage']],
task_name='add drainage')
drainage_raster_path_task = (
f_reg['stream_and_drainage_path'], drainage_task)
Expand All @@ -648,33 +735,34 @@ def execute(args):
threshold_w_task = task_graph.add_task(
func=_calculate_w,
args=(
lulc_to_c, f_reg['aligned_lulc_path'], f_reg['w_path'],
lulc_to_c, f_reg['masked_lulc_path'], f_reg['w_path'],
f_reg['thresholded_w_path']),
target_path_list=[f_reg['w_path'], f_reg['thresholded_w_path']],
dependent_task_list=[align_task],
dependent_task_list=[mask_tasks['masked_lulc']],
task_name='calculate W')

lulc_to_cp = (biophysical_df['usle_c'] * biophysical_df['usle_p']).to_dict()
cp_task = task_graph.add_task(
func=_calculate_cp,
args=(
lulc_to_cp, f_reg['aligned_lulc_path'],
lulc_to_cp, f_reg['masked_lulc_path'],
f_reg['cp_factor_path']),
target_path_list=[f_reg['cp_factor_path']],
dependent_task_list=[align_task],
dependent_task_list=[mask_tasks['masked_lulc']],
task_name='calculate CP')

rkls_task = task_graph.add_task(
func=_calculate_rkls,
args=(
f_reg['ls_path'],
f_reg['aligned_erosivity_path'],
f_reg['aligned_erodibility_path'],
f_reg['masked_erosivity_path'],
f_reg['masked_erodibility_path'],
drainage_raster_path_task[0],
f_reg['rkls_path']),
target_path_list=[f_reg['rkls_path']],
dependent_task_list=[
align_task, drainage_raster_path_task[1], ls_factor_task],
mask_tasks['masked_erosivity'], mask_tasks['masked_erodibility'],
drainage_raster_path_task[1], ls_factor_task],
task_name='calculate RKLS')

usle_task = task_graph.add_task(
Expand Down Expand Up @@ -705,8 +793,7 @@ def execute(args):
accumulation_path, out_bar_path),
target_path_list=[accumulation_path, out_bar_path],
dependent_task_list=[
align_task, factor_task, flow_accumulation_task,
flow_dir_task],
factor_task, flow_accumulation_task, flow_dir_task],
task_name='calculate %s' % bar_id)
bar_task_map[bar_id] = bar_task

Expand Down
8 changes: 4 additions & 4 deletions tests/test_sdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,10 @@ def test_non_square_dem(self):
sdr.execute(args)

expected_results = {
'sed_export': 0.08896198869,
'usle_tot': 1.86480903625,
'avoid_exp': 9203.955078125,
'avoid_eros': 194212.28125,
'sed_export': 0.08894859254,
'usle_tot': 1.86440658569,
'avoid_exp': 9202.60546875,
'avoid_eros': 194171.578125,
}

vector_path = os.path.join(
Expand Down