From 084668c38de1a1ff87977dc234d8e551b8655d3d Mon Sep 17 00:00:00 2001 From: Emily Soth Date: Thu, 29 Feb 2024 14:34:46 -0800 Subject: [PATCH] use routing wrapper in SWY local recharge function --- .../seasonal_water_yield_core.pyx | 273 +++++++----------- 1 file changed, 106 insertions(+), 167 deletions(-) diff --git a/src/natcap/invest/seasonal_water_yield/seasonal_water_yield_core.pyx b/src/natcap/invest/seasonal_water_yield/seasonal_water_yield_core.pyx index 5bb4b3e1c1..45bbb89624 100644 --- a/src/natcap/invest/seasonal_water_yield/seasonal_water_yield_core.pyx +++ b/src/natcap/invest/seasonal_water_yield/seasonal_water_yield_core.pyx @@ -32,7 +32,7 @@ LOGGER = logging.getLogger(__name__) cdef int N_MONTHS = 12 -cpdef calculate_local_recharge( +def calculate_local_recharge( precip_path_list, et0_path_list, qf_m_path_list, flow_dir_mfd_path, kc_path_list, alpha_month_map, float beta_i, float gamma, stream_path, target_li_path, target_li_avail_path, target_l_sum_avail_path, @@ -76,35 +76,6 @@ cpdef calculate_local_recharge( None. """ - cdef int i_n, flow_dir_nodata, flow_dir_mfd - cdef int peak_pixel - cdef long xs, ys, xs_root, ys_root, xoff, yoff - cdef int flow_dir_s - cdef long xi, yi, xj, yj - cdef int flow_dir_j, p_ij_base - cdef int n_dir - cdef long raster_x_size, raster_y_size, win_xsize, win_ysize - cdef double pet_m, p_m, qf_m, et0_m, aet_i, p_i, qf_i, l_i, l_avail_i - cdef float qf_nodata, kc_nodata - - cdef float mfd_direction_array[8] - - cdef queue[pair[long, long]] work_queue - cdef _ManagedRaster et0_m_raster, qf_m_raster, kc_m_raster - - cdef numpy.ndarray[numpy.npy_float32, ndim=1] alpha_month_array = ( - numpy.array( - [x[1] for x in sorted(alpha_month_map.items())], - dtype=numpy.float32)) - - # used for time-delayed logging - cdef time_t last_log_time - last_log_time = ctime(NULL) - - # we know the PyGeoprocessing MFD raster flow dir type is a 32 bit int. - flow_dir_raster_info = pygeoprocessing.get_raster_info(flow_dir_mfd_path) - flow_dir_nodata = flow_dir_raster_info['nodata'][0] - raster_x_size, raster_y_size = flow_dir_raster_info['raster_size'] cdef ManagedFlowDirRaster flow_raster = ManagedFlowDirRaster( flow_dir_mfd_path, 1, 0) @@ -113,36 +84,26 @@ cpdef calculate_local_recharge( # precipitation and evapotranspiration data should # always be non-negative et0_m_raster_list = [] - et0_m_nodata_list = [] for et0_path in et0_path_list: - et0_m_raster_list.append(_ManagedRaster(et0_path, 1, 0)) - nodata = pygeoprocessing.get_raster_info(et0_path)['nodata'][0] - if nodata is None: - nodata = -1 - et0_m_nodata_list.append(nodata) + et0_m_raster = _ManagedRaster(et0_path, 1, 0) + et0_m_raster_list.append(et0_m_raster) + if et0_m_raster.nodata is None: + et0_m_raster.nodata = -1 precip_m_raster_list = [] - precip_m_nodata_list = [] for precip_m_path in precip_path_list: - precip_m_raster_list.append(_ManagedRaster(precip_m_path, 1, 0)) - nodata = pygeoprocessing.get_raster_info(precip_m_path)['nodata'][0] - if nodata is None: - nodata = -1 - precip_m_nodata_list.append(nodata) + precip_m_raster = _ManagedRaster(precip_m_path, 1, 0) + precip_m_raster_list.append(precip_m_raster) + if precip_m_raster.nodata is None: + precip_m_raster.nodata = -1 qf_m_raster_list = [] - qf_m_nodata_list = [] for qf_m_path in qf_m_path_list: qf_m_raster_list.append(_ManagedRaster(qf_m_path, 1, 0)) - qf_m_nodata_list.append( - pygeoprocessing.get_raster_info(qf_m_path)['nodata'][0]) kc_m_raster_list = [] - kc_m_nodata_list = [] for kc_m_path in kc_path_list: kc_m_raster_list.append(_ManagedRaster(kc_m_path, 1, 0)) - kc_m_nodata_list.append( - pygeoprocessing.get_raster_info(kc_m_path)['nodata'][0]) target_nodata = -1e32 pygeoprocessing.new_raster_from_base( @@ -175,126 +136,104 @@ cpdef calculate_local_recharge( cdef _ManagedRaster target_pi_raster = _ManagedRaster( target_pi_path, 1, 1) + def seed_fn(x, y): + return flow_raster.is_local_high_point(x, y) - for offset_dict in pygeoprocessing.iterblocks( - (flow_dir_mfd_path, 1), offset_only=True, largest_block=0): - # use cython variables to avoid python overhead of dict values - win_xsize = offset_dict['win_xsize'] - win_ysize = offset_dict['win_ysize'] - xoff = offset_dict['xoff'] - yoff = offset_dict['yoff'] - if ctime(NULL) - last_log_time > 5.0: - last_log_time = ctime(NULL) - current_pixel = xoff + yoff * raster_x_size - LOGGER.info( - 'peak point detection %.2f%% complete', - 100.0 * current_pixel / ( - raster_x_size * raster_y_size)) - - # search block for a peak pixel where no other pixel drains to it. - for ys in xrange(win_ysize): - ys_root = yoff + ys - for xs in xrange(win_xsize): - xs_root = xoff + xs - - if flow_raster.is_local_high_point(xs_root, ys_root): - work_queue.push( - pair[long, long](xs_root, ys_root)) - - while work_queue.size() > 0: - xi = work_queue.front().first - yi = work_queue.front().second - work_queue.pop() - - l_sum_avail_i = target_l_sum_avail_raster.get(xi, yi) - if not is_close(l_sum_avail_i, target_nodata): - # already defined - continue - - # Equation 7, calculate L_sum_avail_i if possible, skip - # otherwise - upslope_defined = 1 - # initialize to 0 so we indicate we haven't tracked any - # mfd values yet - l_sum_avail_i = 0 - for neighbor in flow_raster.get_upslope_neighbors(xi, yi): - # pixel flows inward, check upslope - l_sum_avail_j = target_l_sum_avail_raster.get( - neighbor.x, neighbor.y) - if is_close(l_sum_avail_j, target_nodata): - upslope_defined = 0 - break - l_avail_j = target_li_avail_raster.get( - neighbor.x, neighbor.y) - # A step of Equation 7 - l_sum_avail_i += ( - l_sum_avail_j + l_avail_j) * neighbor.flow_proportion - # calculate l_sum_avail_i by summing all the valid - # directions then normalizing by the sum of the mfd - # direction weights (Equation 8) - if upslope_defined: - # Equation 7 - target_l_sum_avail_raster.set(xi, yi, l_sum_avail_i) - else: - # if not defined, we'll get it on another pass - continue - - aet_i = 0 - p_i = 0 - qf_i = 0 - - for m_index in range(12): - precip_m_raster = ( - <_ManagedRaster?>precip_m_raster_list[m_index]) - qf_m_raster = ( - <_ManagedRaster?>qf_m_raster_list[m_index]) - et0_m_raster = ( - <_ManagedRaster?>et0_m_raster_list[m_index]) - kc_m_raster = ( - <_ManagedRaster?>kc_m_raster_list[m_index]) - - et0_nodata = et0_m_nodata_list[m_index] - precip_nodata = precip_m_nodata_list[m_index] - qf_nodata = qf_m_nodata_list[m_index] - kc_nodata = kc_m_nodata_list[m_index] - - p_m = precip_m_raster.get(xi, yi) - if not is_close(p_m, precip_nodata): - p_i += p_m - else: - p_m = 0 - - qf_m = qf_m_raster.get(xi, yi) - if not is_close(qf_m, qf_nodata): - qf_i += qf_m - else: - qf_m = 0 - - kc_m = kc_m_raster.get(xi, yi) - pet_m = 0 - et0_m = et0_m_raster.get(xi, yi) - if not ( - is_close(kc_m, kc_nodata) or - is_close(et0_m, et0_nodata)): - # Equation 6 - pet_m = kc_m * et0_m - - # Equation 4/5 - aet_i += min( - pet_m, - p_m - qf_m + - alpha_month_array[m_index]*beta_i*l_sum_avail_i) - - l_i = (p_i - qf_i - aet_i) - l_avail_i = min(gamma * l_i, l_i) - - target_pi_raster.set(xi, yi, p_i) - target_aet_raster.set(xi, yi, aet_i) - target_li_raster.set(xi, yi, l_i) - target_li_avail_raster.set(xi, yi, l_avail_i) - - for neighbor in flow_raster.get_downslope_neighbors(xi, yi): - work_queue.push(pair[long, long](neighbor.x, neighbor.y)) + def route_fn(xi, yi): + cdef double pet_m, p_m, qf_m, et0_m, aet_i, p_i, qf_i, l_i, l_avail_i + cdef _ManagedRaster et0_m_raster, qf_m_raster, kc_m_raster + cdef numpy.ndarray[numpy.npy_float32, ndim=1] alpha_month_array = ( + numpy.array( + [x[1] for x in sorted(alpha_month_map.items())], + dtype=numpy.float32)) + l_sum_avail_i = target_l_sum_avail_raster.get(xi, yi) + if not is_close(l_sum_avail_i, target_l_sum_avail_raster.nodata): + # already defined + return [] + + # Equation 7, calculate L_sum_avail_i if possible, skip + # otherwise + upslope_defined = 1 + # initialize to 0 so we indicate we haven't tracked any + # mfd values yet + l_sum_avail_i = 0 + for neighbor in flow_raster.get_upslope_neighbors(xi, yi): + # pixel flows inward, check upslope + l_sum_avail_j = target_l_sum_avail_raster.get( + neighbor.x, neighbor.y) + if is_close(l_sum_avail_j, target_l_sum_avail_raster.nodata): + upslope_defined = 0 + break + l_avail_j = target_li_avail_raster.get( + neighbor.x, neighbor.y) + # A step of Equation 7 + l_sum_avail_i += ( + l_sum_avail_j + l_avail_j) * neighbor.flow_proportion + # calculate l_sum_avail_i by summing all the valid + # directions then normalizing by the sum of the mfd + # direction weights (Equation 8) + if upslope_defined: + # Equation 7 + target_l_sum_avail_raster.set(xi, yi, l_sum_avail_i) + else: + # if not defined, we'll get it on another pass + return [] + + aet_i = 0 + p_i = 0 + qf_i = 0 + + for m_index in range(12): + precip_m_raster = ( + <_ManagedRaster?>precip_m_raster_list[m_index]) + qf_m_raster = ( + <_ManagedRaster?>qf_m_raster_list[m_index]) + et0_m_raster = ( + <_ManagedRaster?>et0_m_raster_list[m_index]) + kc_m_raster = ( + <_ManagedRaster?>kc_m_raster_list[m_index]) + + p_m = precip_m_raster.get(xi, yi) + if not is_close(p_m, precip_m_raster.nodata): + p_i += p_m + else: + p_m = 0 + + qf_m = qf_m_raster.get(xi, yi) + if not is_close(qf_m, qf_m_raster.nodata): + qf_i += qf_m + else: + qf_m = 0 + + kc_m = kc_m_raster.get(xi, yi) + pet_m = 0 + et0_m = et0_m_raster.get(xi, yi) + if not ( + is_close(kc_m, kc_m_raster.nodata) or + is_close(et0_m, et0_m_raster.nodata)): + # Equation 6 + pet_m = kc_m * et0_m + + # Equation 4/5 + aet_i += min( + pet_m, + p_m - qf_m + + alpha_month_array[m_index]*beta_i*l_sum_avail_i) + + l_i = (p_i - qf_i - aet_i) + l_avail_i = min(gamma * l_i, l_i) + + target_pi_raster.set(xi, yi, p_i) + target_aet_raster.set(xi, yi, aet_i) + target_li_raster.set(xi, yi, l_i) + target_li_avail_raster.set(xi, yi, l_avail_i) + + to_push = [] + for neighbor in flow_raster.get_downslope_neighbors(xi, yi): + to_push.append(neighbor.y * flow_raster.raster_x_size + neighbor.x) + return to_push + + route(flow_dir_mfd_path, seed_fn, route_fn) def route_baseflow_sum(