diff --git a/src/pyfia/estimation/base.py b/src/pyfia/estimation/base.py index 5d7bb25..2a2be62 100644 --- a/src/pyfia/estimation/base.py +++ b/src/pyfia/estimation/base.py @@ -733,11 +733,14 @@ def _expand_plots_for_all_groups( tuple[pl.DataFrame, List[str]] Expanded plot data and valid group columns """ - # Get all plots from stratification + # Get all plots from stratification (include B&P columns when available) strat_data = self._get_stratification_data() - all_plots = ( - strat_data.select("PLT_CN", "STRATUM_CN", "EXPNS").unique().collect() - ) + strat_schema = strat_data.collect_schema().names() + bp_cols = ["ESTN_UNIT_CN", "STRATUM_WGT", "AREA_USED", "P2POINTCNT"] + select_cols = ["PLT_CN", "STRATUM_CN", "EXPNS"] + [ + c for c in bp_cols if c in strat_schema + ] + all_plots = strat_data.select(select_cols).unique().collect() valid_group_cols = [c for c in group_cols if c in plot_data.columns] @@ -1023,9 +1026,11 @@ def _calculate_variance_for_metrics( ) # Build plot-level aggregation - plot_agg_exprs = [ - pl.sum("CONDPROP_UNADJ").cast(pl.Float64).alias("x_i") - ] if "CONDPROP_UNADJ" in plot_cond_data.columns else [pl.lit(1.0).alias("x_i")] + plot_agg_exprs = ( + [pl.sum("CONDPROP_UNADJ").cast(pl.Float64).alias("x_i")] + if "CONDPROP_UNADJ" in plot_cond_data.columns + else [pl.lit(1.0).alias("x_i")] + ) for i in range(len(metric_configs)): plot_agg_exprs.append(pl.sum(f"y_{i}_ic").alias(f"y_{i}_i")) @@ -1034,9 +1039,13 @@ def _calculate_variance_for_metrics( # Step 3: Get ALL plots in the evaluation for proper variance calculation strat_data = self._get_stratification_data() - all_plots = ( - strat_data.select("PLT_CN", "STRATUM_CN", "EXPNS").unique().collect() - ) + strat_schema = strat_data.collect_schema().names() + # Select B&P variance columns when available + bp_cols = ["ESTN_UNIT_CN", "STRATUM_WGT", "AREA_USED", "P2POINTCNT"] + select_cols = ["PLT_CN", "STRATUM_CN", "EXPNS"] + [ + c for c in bp_cols if c in strat_schema + ] + all_plots = strat_data.select(select_cols).unique().collect() # Step 4: Calculate variance for each group or overall if group_cols: @@ -1074,8 +1083,11 @@ def _calculate_grouped_multi_metric_variance( Uses a loop over groups for now; could be further optimized with vectorized operations in the future. + + Uses ratio-of-means variance for per-acre SE: + V(R) = (1/X^2) * [V(Y) + R^2*V(X) - 2*R*Cov(Y,X)] """ - from .variance import calculate_domain_total_variance + from .variance import calculate_ratio_of_means_variance variance_results = [] @@ -1097,7 +1109,9 @@ def _calculate_grouped_multi_metric_variance( group_plot_data = plot_data.filter(group_filter) # Build select columns for join - select_cols = ["PLT_CN", "x_i"] + [f"y_{i}_i" for i in range(len(metric_configs))] + select_cols = ["PLT_CN", "x_i"] + [ + f"y_{i}_i" for i in range(len(metric_configs)) + ] select_cols = [c for c in select_cols if c in group_plot_data.columns] # Join with ALL plots, filling missing with zeros @@ -1120,29 +1134,26 @@ def _calculate_grouped_multi_metric_variance( result_row = dict(group_dict) if len(all_plots_group) > 0: - # Calculate total area for per-acre SE - total_area = ( - all_plots_group["EXPNS"] * all_plots_group["x_i"] - ).sum() - for i, cfg in enumerate(metric_configs): y_col = f"y_{i}_i" if y_col in all_plots_group.columns: - var_stats = calculate_domain_total_variance( - all_plots_group, y_col - ) - se_acre = ( - var_stats["se_total"] / total_area if total_area > 0 else 0.0 + ratio_stats = calculate_ratio_of_means_variance( + all_plots_group, y_col, "x_i" ) + se_acre = ratio_stats["se_ratio"] result_row[cfg["acre_se_col"]] = se_acre - result_row[cfg["total_se_col"]] = var_stats["se_total"] + result_row[cfg["total_se_col"]] = ratio_stats["se_total"] # Add variance columns if specified if "acre_var_col" in cfg: - result_row[cfg["acre_var_col"]] = se_acre**2 + result_row[cfg["acre_var_col"]] = ratio_stats[ + "variance_ratio" + ] if "total_var_col" in cfg: - result_row[cfg["total_var_col"]] = var_stats["variance_total"] + result_row[cfg["total_var_col"]] = ratio_stats[ + "variance_total" + ] else: # No data for this group for cfg in metric_configs: @@ -1159,7 +1170,9 @@ def _calculate_grouped_multi_metric_variance( if variance_results: var_df = pl.DataFrame(variance_results) # Use only valid group columns that exist in both dataframes - join_cols = [c for c in group_cols if c in var_df.columns and c in results.columns] + join_cols = [ + c for c in group_cols if c in var_df.columns and c in results.columns + ] if join_cols: results = results.join(var_df, on=join_cols, how="left") @@ -1174,11 +1187,16 @@ def _calculate_overall_multi_metric_variance( ) -> pl.DataFrame: """ Calculate overall variance (ungrouped) for multiple metrics. + + Uses ratio-of-means variance for per-acre SE: + V(R) = (1/X^2) * [V(Y) + R^2*V(X) - 2*R*Cov(Y,X)] """ - from .variance import calculate_domain_total_variance + from .variance import calculate_ratio_of_means_variance # Build select columns for join - select_cols = ["PLT_CN", "x_i"] + [f"y_{i}_i" for i in range(len(metric_configs))] + select_cols = ["PLT_CN", "x_i"] + [ + f"y_{i}_i" for i in range(len(metric_configs)) + ] select_cols = [c for c in select_cols if c in plot_data.columns] # Join with ALL plots, filling missing with zeros @@ -1197,34 +1215,39 @@ def _calculate_overall_multi_metric_variance( all_plots_with_values = all_plots_with_values.with_columns(fill_exprs) - # Calculate total area for per-acre SE - total_area = ( - all_plots_with_values["EXPNS"] * all_plots_with_values["x_i"] - ).sum() - # Calculate variance for each metric and add to results for i, cfg in enumerate(metric_configs): y_col = f"y_{i}_i" if y_col in all_plots_with_values.columns: - var_stats = calculate_domain_total_variance( - all_plots_with_values, y_col + ratio_stats = calculate_ratio_of_means_variance( + all_plots_with_values, y_col, "x_i" ) - se_acre = var_stats["se_total"] / total_area if total_area > 0 else 0.0 + se_acre = ratio_stats["se_ratio"] - results = results.with_columns([ - pl.lit(se_acre).alias(cfg["acre_se_col"]), - pl.lit(var_stats["se_total"]).alias(cfg["total_se_col"]), - ]) + results = results.with_columns( + [ + pl.lit(se_acre).alias(cfg["acre_se_col"]), + pl.lit(ratio_stats["se_total"]).alias(cfg["total_se_col"]), + ] + ) # Add variance columns if specified if "acre_var_col" in cfg: - results = results.with_columns([ - pl.lit(se_acre**2).alias(cfg["acre_var_col"]), - ]) + results = results.with_columns( + [ + pl.lit(ratio_stats["variance_ratio"]).alias( + cfg["acre_var_col"] + ), + ] + ) if "total_var_col" in cfg: - results = results.with_columns([ - pl.lit(var_stats["variance_total"]).alias(cfg["total_var_col"]), - ]) + results = results.with_columns( + [ + pl.lit(ratio_stats["variance_total"]).alias( + cfg["total_var_col"] + ), + ] + ) return results @@ -1242,21 +1265,25 @@ def _add_cv_columns( if acre_col and acre_col in results.columns: cv_col = acre_se_col.replace("_SE", "_CV") - results = results.with_columns([ - pl.when(pl.col(acre_col) > 0) - .then(pl.col(acre_se_col) / pl.col(acre_col) * 100) - .otherwise(None) - .alias(cv_col), - ]) + results = results.with_columns( + [ + pl.when(pl.col(acre_col) > 0) + .then(pl.col(acre_se_col) / pl.col(acre_col) * 100) + .otherwise(None) + .alias(cv_col), + ] + ) if total_col and total_col in results.columns: cv_col = total_se_col.replace("_SE", "_CV") - results = results.with_columns([ - pl.when(pl.col(total_col) > 0) - .then(pl.col(total_se_col) / pl.col(total_col) * 100) - .otherwise(None) - .alias(cv_col), - ]) + results = results.with_columns( + [ + pl.when(pl.col(total_col) > 0) + .then(pl.col(total_se_col) / pl.col(total_col) * 100) + .otherwise(None) + .alias(cv_col), + ] + ) return results diff --git a/src/pyfia/estimation/data_loading.py b/src/pyfia/estimation/data_loading.py index 5d160fc..e3c8503 100644 --- a/src/pyfia/estimation/data_loading.py +++ b/src/pyfia/estimation/data_loading.py @@ -590,17 +590,66 @@ def get_stratification_data(self) -> pl.LazyFrame: # when joining with other tables that also have STATECD, INVYR, etc. ppsa_selected = ppsa_unique.select(["PLT_CN", "STRATUM_CN"]) + # Load POP_ESTN_UNIT for exact B&P variance formula + if "POP_ESTN_UNIT" not in self.db.tables: + self.db.load_table("POP_ESTN_UNIT") + pop_estn_unit = self.db.tables["POP_ESTN_UNIT"] + if not isinstance(pop_estn_unit, pl.LazyFrame): + pop_estn_unit = pop_estn_unit.lazy() + + # Apply EVALID filter to POP_ESTN_UNIT + if self.db.evalid: + pop_estn_unit = pop_estn_unit.filter( + pl.col("EVALID").is_in(self.db.evalid) + ) + + # Deduplicate POP_ESTN_UNIT (same reason as POP_STRATUM) + pop_estn_unit_unique = pop_estn_unit.unique(subset=["CN"]) + + # Select columns from POP_ESTN_UNIT + pop_estn_unit_selected = pop_estn_unit_unique.select( + [ + pl.col("CN").alias("ESTN_UNIT_CN"), + "AREA_USED", + "P1PNTCNT_EU", + ] + ) + # Select necessary columns from POP_STRATUM + # Include ESTN_UNIT_CN, P1POINTCNT, P2POINTCNT for exact B&P variance pop_stratum_selected = pop_stratum_unique.select( [ pl.col("CN").alias("STRATUM_CN"), + "ESTN_UNIT_CN", "EXPNS", "ADJ_FACTOR_MICR", "ADJ_FACTOR_SUBP", "ADJ_FACTOR_MACR", + "P1POINTCNT", + "P2POINTCNT", ] ) + # Join POP_STRATUM with POP_ESTN_UNIT to get AREA_USED and P1PNTCNT_EU + pop_stratum_selected = pop_stratum_selected.join( + pop_estn_unit_selected, on="ESTN_UNIT_CN", how="left" + ) + + # Compute STRATUM_WGT = P1POINTCNT / P1PNTCNT_EU + # Guard against null or zero P1PNTCNT_EU to avoid inf/NaN propagation + pop_stratum_selected = pop_stratum_selected.with_columns( + pl.when( + pl.col("P1PNTCNT_EU").is_not_null() + & (pl.col("P1PNTCNT_EU").cast(pl.Float64) > 0) + ) + .then( + pl.col("P1POINTCNT").cast(pl.Float64) + / pl.col("P1PNTCNT_EU").cast(pl.Float64) + ) + .otherwise(0.0) + .alias("STRATUM_WGT") + ) + # Select MACRO_BREAKPOINT_DIA from PLOT table # This is CRITICAL for correct adjustment factor selection in states with macroplots plot_cols = [pl.col("CN").alias("PLT_CN"), "MACRO_BREAKPOINT_DIA"] diff --git a/src/pyfia/estimation/estimators/carbon_pools.py b/src/pyfia/estimation/estimators/carbon_pools.py index 5adb61c..cc9f3c5 100644 --- a/src/pyfia/estimation/estimators/carbon_pools.py +++ b/src/pyfia/estimation/estimators/carbon_pools.py @@ -22,7 +22,7 @@ from ..constants import LBS_TO_SHORT_TONS from ..tree_expansion import apply_tree_adjustment_factors from ..utils import validate_required_columns -from ..variance import calculate_domain_total_variance +from ..variance import calculate_ratio_of_means_variance class CarbonPoolEstimator(BaseEstimator): @@ -224,6 +224,8 @@ def calculate_variance(self, results: pl.DataFrame) -> pl.DataFrame: plot_group_cols = ["PLT_CN", "CONDID", "EXPNS"] if "STRATUM_CN" in self.plot_tree_data.columns: plot_group_cols.insert(2, "STRATUM_CN") + if "CONDPROP_UNADJ" in self.plot_tree_data.columns: + plot_group_cols.append("CONDPROP_UNADJ") # Add grouping columns if self.group_cols: @@ -248,26 +250,32 @@ def calculate_variance(self, results: pl.DataFrame) -> pl.DataFrame: [c for c in self.group_cols if c in plot_cond_data.columns] ) - plot_data = plot_cond_data.group_by(plot_level_cols).agg( - [ - pl.sum("y_carb_ic").alias("y_carb_i"), # Total carbon per plot - pl.lit(1.0).alias("x_i"), # Area proportion per plot (full plot = 1) - ] + # Include CONDPROP_UNADJ for correct area proportion (x_i) + condprop_col = ( + "CONDPROP_UNADJ" if "CONDPROP_UNADJ" in plot_cond_data.columns else None ) + agg_cols = [pl.sum("y_carb_ic").alias("y_carb_i")] + if condprop_col: + agg_cols.append(pl.sum(condprop_col).cast(pl.Float64).alias("x_i")) + else: + agg_cols.append(pl.lit(1.0).alias("x_i")) - # Step 3: Calculate variance for each group or overall - if self.group_cols: - # Get ALL plots in the evaluation for proper variance calculation - strat_data = self._get_stratification_data() - all_plots = ( - strat_data.select("PLT_CN", "STRATUM_CN", "EXPNS").unique().collect() - ) + plot_data = plot_cond_data.group_by(plot_level_cols).agg(agg_cols) + + # Step 3: Get ALL plots with B&P columns for proper variance calculation + strat_data = self._get_stratification_data() + strat_schema = strat_data.collect_schema().names() + bp_cols = ["ESTN_UNIT_CN", "STRATUM_WGT", "AREA_USED", "P2POINTCNT"] + select_cols = ["PLT_CN", "STRATUM_CN", "EXPNS"] + [ + c for c in bp_cols if c in strat_schema + ] + all_plots = strat_data.select(select_cols).unique().collect() - # Calculate variance for each group separately + # Step 4: Calculate variance using ratio-of-means + if self.group_cols: variance_results = [] for group_vals in results.iter_rows(): - # Build filter for this group group_filter = pl.lit(True) group_dict = {} @@ -278,10 +286,8 @@ def calculate_variance(self, results: pl.DataFrame) -> pl.DataFrame: pl.col(col) == group_vals[results.columns.index(col)] ) - # Filter plot data for this specific group group_plot_data = plot_data.filter(group_filter) - # Join with ALL plots, filling missing with zeros all_plots_group = all_plots.join( group_plot_data.select(["PLT_CN", "y_carb_i", "x_i"]), on="PLT_CN", @@ -294,21 +300,14 @@ def calculate_variance(self, results: pl.DataFrame) -> pl.DataFrame: ) if len(all_plots_group) > 0: - carb_stats = calculate_domain_total_variance( - all_plots_group, "y_carb_i" - ) - # Calculate total area for per-acre SE - total_area = ( - all_plots_group["EXPNS"] * all_plots_group["x_i"] - ).sum() - se_acre = ( - carb_stats["se_total"] / total_area if total_area > 0 else 0.0 + ratio_stats = calculate_ratio_of_means_variance( + all_plots_group, "y_carb_i", "x_i" ) variance_results.append( { **group_dict, - "CARBON_ACRE_SE": se_acre, - "CARBON_TOTAL_SE": carb_stats["se_total"], + "CARBON_ACRE_SE": ratio_stats["se_ratio"], + "CARBON_TOTAL_SE": ratio_stats["se_total"], } ) else: @@ -320,21 +319,30 @@ def calculate_variance(self, results: pl.DataFrame) -> pl.DataFrame: } ) - # Join variance results back to main results if variance_results: var_df = pl.DataFrame(variance_results) results = results.join(var_df, on=self.group_cols, how="left") else: - # No grouping, calculate overall variance - carb_stats = calculate_domain_total_variance(plot_data, "y_carb_i") - # Calculate total area for per-acre SE - total_area = (plot_data["EXPNS"] * plot_data["x_i"]).sum() - se_acre = carb_stats["se_total"] / total_area if total_area > 0 else 0.0 + # No grouping, calculate overall variance with ALL plots + all_plots_with_values = all_plots.join( + plot_data.select(["PLT_CN", "y_carb_i", "x_i"]), + on="PLT_CN", + how="left", + ).with_columns( + [ + pl.col("y_carb_i").fill_null(0.0), + pl.col("x_i").fill_null(0.0), + ] + ) + + ratio_stats = calculate_ratio_of_means_variance( + all_plots_with_values, "y_carb_i", "x_i" + ) results = results.with_columns( [ - pl.lit(se_acre).alias("CARBON_ACRE_SE"), - pl.lit(carb_stats["se_total"]).alias("CARBON_TOTAL_SE"), + pl.lit(ratio_stats["se_ratio"]).alias("CARBON_ACRE_SE"), + pl.lit(ratio_stats["se_total"]).alias("CARBON_TOTAL_SE"), ] ) diff --git a/src/pyfia/estimation/grm_base.py b/src/pyfia/estimation/grm_base.py index 15e6254..eff5d4b 100644 --- a/src/pyfia/estimation/grm_base.py +++ b/src/pyfia/estimation/grm_base.py @@ -413,7 +413,7 @@ def _calculate_grm_variance( ValueError If plot_tree_data is not available for variance calculation. """ - from .variance import calculate_domain_total_variance + from .variance import calculate_ratio_of_means_variance if plot_tree_data is None: raise ValueError( @@ -465,11 +465,15 @@ def _calculate_grm_variance( # Get ALL plots in the evaluation for proper variance calculation strat_data = self._get_stratification_data() - all_plots = ( - strat_data.select("PLT_CN", "STRATUM_CN", "EXPNS").unique().collect() - ) - - # Calculate variance + strat_schema = strat_data.collect_schema().names() + # Select B&P variance columns when available + bp_cols = ["ESTN_UNIT_CN", "STRATUM_WGT", "AREA_USED", "P2POINTCNT"] + select_cols = ["PLT_CN", "STRATUM_CN", "EXPNS"] + [ + c for c in bp_cols if c in strat_schema + ] + all_plots = strat_data.select(select_cols).unique().collect() + + # Calculate variance using ratio-of-means for per-acre SE if group_cols: variance_results = [] @@ -500,22 +504,15 @@ def _calculate_grm_variance( ) if len(all_plots_group) > 0: - # Calculate variance using domain total formula (matches EVALIDator) - var_stats = calculate_domain_total_variance(all_plots_group, "y_i") - - # Calculate per-acre SE by dividing total SE by total area - total_area = ( - all_plots_group["EXPNS"] * all_plots_group["x_i"] - ).sum() - se_acre = ( - var_stats["se_total"] / total_area if total_area > 0 else 0.0 + ratio_stats = calculate_ratio_of_means_variance( + all_plots_group, "y_i", "x_i" ) variance_results.append( { **group_dict, - acre_se_col: se_acre, - total_se_col: var_stats["se_total"], + acre_se_col: ratio_stats["se_ratio"], + total_se_col: ratio_stats["se_total"], } ) else: @@ -543,18 +540,14 @@ def _calculate_grm_variance( ] ) - var_stats = calculate_domain_total_variance(all_plots_with_values, "y_i") - - # Calculate per-acre SE by dividing total SE by total area - total_area = ( - all_plots_with_values["EXPNS"] * all_plots_with_values["x_i"] - ).sum() - se_acre = var_stats["se_total"] / total_area if total_area > 0 else 0.0 + ratio_stats = calculate_ratio_of_means_variance( + all_plots_with_values, "y_i", "x_i" + ) results = results.with_columns( [ - pl.lit(se_acre).alias(acre_se_col), - pl.lit(var_stats["se_total"]).alias(total_se_col), + pl.lit(ratio_stats["se_ratio"]).alias(acre_se_col), + pl.lit(ratio_stats["se_total"]).alias(total_se_col), ] ) diff --git a/src/pyfia/estimation/variance.py b/src/pyfia/estimation/variance.py index 3720ecf..a19d7a7 100644 --- a/src/pyfia/estimation/variance.py +++ b/src/pyfia/estimation/variance.py @@ -5,18 +5,52 @@ all estimation modules, implementing variance formulas from Bechtold & Patterson (2005), Chapter 4 (pp. 53-77). -The domain total variance formula is implemented: +Domain Total Variance (V(Y)): +----------------------------- -V(Ŷ) = Σ_h W_h² × s²_yh × n_h +The exact post-stratified domain total variance formula is implemented: + +For each estimation unit EU: + + V_EU = V1 + V2 + + V1 = (A²/n) × Σ_h W_h × s²_yh / n_h (within-stratum term) + V2 = (A²/n²) × Σ_h (1 - W_h) × s²_yh / n_h (post-stratification correction) + + V_total = Σ_EU V_EU Where: -- W_h is the stratum expansion factor (EXPNS, acres per plot) -- s²_yh is the sample variance within stratum h (with ddof=1) -- n_h is the number of plots in stratum h +- A = AREA_USED (total area of the estimation unit in acres) +- n = total number of phase 2 plots in the estimation unit (Σ_h n_h) +- W_h = STRATUM_WGT = P1POINTCNT / P1PNTCNT_EU (phase 1 stratum weight) +- s²_yh = sample variance within stratum h (with ddof=1) +- n_h = P2POINTCNT = number of phase 2 plots in stratum h + +The V2 term captures uncertainty from estimating stratum weights from +the sample. It is zero under proportional allocation (W_h = n_h/n) +and small when allocation is approximately proportional. This matches +rFIA's unitVar() implementation. + +Ratio-of-Means Variance (V(R)): +------------------------------- -This is the variance formula used by EVALIDator for tree-based estimates -(volume, biomass, TPA, GRM) and produces SE estimates within 1-3% of -EVALIDator output. +For per-acre estimates R = Y/X, the ratio-of-means variance from +Section 4.2 is used: + + V(R) = (1/X²) × [V(Y) + R² × V(X) - 2R × Cov(Y,X)] + +Where V(Y), V(X), and Cov(Y,X) are all computed using the same +V1+V2 post-stratified formula applied to the Y, X, and cross terms. + +Since Y and X are estimated from the same sample plots, their +covariance is typically positive (more forest area → more volume), +which means the ratio variance is less than the simplified +V(Y)/X² formula. The old formula se_acre = se_total/total_area +treated the denominator as known, overestimating per-acre SE. + +Fallback formulas: +- V(Y) = Σ_h EXPNS² × s²_yh × n_h (when B&P columns absent) +- V(R) = (1/X²) × Σ_h [w²_h × n_h × (s²_y - 2R×cov_yx + R²×s²_x)] Key implementation requirements: - Include ALL plots (even with zero values) in variance calculations @@ -51,6 +85,10 @@ def calculate_grouped_domain_total_variance( x_col: str = "x_i", stratum_col: str = "STRATUM_CN", weight_col: str = "EXPNS", + estn_unit_col: str = "ESTN_UNIT_CN", + stratum_wgt_col: str = "STRATUM_WGT", + area_used_col: str = "AREA_USED", + p2pointcnt_col: str = "P2POINTCNT", ) -> pl.DataFrame: """Calculate domain total variance for multiple groups in a single pass. @@ -58,8 +96,9 @@ def calculate_grouped_domain_total_variance( computes variance for all groups simultaneously using Polars group_by operations, avoiding the N+1 query pattern of iterating through groups. - Implements the stratified domain total variance formula: - V(Ŷ) = Σ_h w_h² × s²_yh × n_h + Implements the exact Bechtold & Patterson (2005) post-stratified + variance formula (V1 + V2) when B&P columns are available, falling + back to the simplified formula otherwise. Parameters ---------- @@ -76,6 +115,14 @@ def calculate_grouped_domain_total_variance( Column name for stratum assignment weight_col : str, default 'EXPNS' Column name for stratum weights (expansion factors) + estn_unit_col : str, default 'ESTN_UNIT_CN' + Column name for estimation unit identifier + stratum_wgt_col : str, default 'STRATUM_WGT' + Column name for phase 1 stratum weight + area_used_col : str, default 'AREA_USED' + Column name for total area of estimation unit + p2pointcnt_col : str, default 'P2POINTCNT' + Column name for number of phase 2 plots in stratum Returns ------- @@ -95,109 +142,498 @@ def calculate_grouped_domain_total_variance( valid_group_cols = [c for c in group_cols if c in plot_data.columns] if not valid_group_cols: - # No grouping - fall back to scalar calculation - var_stats = calculate_domain_total_variance( - plot_data, y_col, stratum_col, weight_col + # No grouping - fall back to scalar calculation with ratio variance + if x_col in plot_data.columns: + ratio_stats = calculate_ratio_of_means_variance( + plot_data, + y_col, + x_col, + stratum_col, + weight_col, + estn_unit_col, + stratum_wgt_col, + area_used_col, + p2pointcnt_col, + ) + return pl.DataFrame( + { + "se_acre": [ratio_stats["se_ratio"]], + "se_total": [ratio_stats["se_total"]], + "variance_acre": [ratio_stats["variance_ratio"]], + "variance_total": [ratio_stats["variance_total"]], + } + ) + else: + var_stats = calculate_domain_total_variance( + plot_data, + y_col, + stratum_col, + weight_col, + estn_unit_col, + stratum_wgt_col, + area_used_col, + p2pointcnt_col, + ) + return pl.DataFrame( + { + "se_acre": [0.0], + "se_total": [var_stats["se_total"]], + "variance_acre": [0.0], + "variance_total": [var_stats["variance_total"]], + } + ) + + # Check if exact B&P columns are available + has_bp_cols = all( + col in plot_data.columns + for col in [estn_unit_col, stratum_wgt_col, area_used_col, p2pointcnt_col] + ) + + if has_bp_cols: + return _calculate_grouped_exact_bp_variance( + plot_data, + valid_group_cols, + y_col, + x_col, + stratum_col, + weight_col, + estn_unit_col, + stratum_wgt_col, + area_used_col, + p2pointcnt_col, ) - # Calculate per-acre SE - total_x = ( - (plot_data[weight_col] * plot_data[x_col]).sum() - if x_col in plot_data.columns - else 1.0 + + # Fallback: simplified formula + return _calculate_grouped_simplified_variance( + plot_data, + valid_group_cols, + y_col, + x_col, + stratum_col, + weight_col, + ) + + +def _calculate_grouped_exact_bp_variance( + plot_data: pl.DataFrame, + valid_group_cols: List[str], + y_col: str, + x_col: str, + stratum_col: str, + weight_col: str, + estn_unit_col: str, + stratum_wgt_col: str, + area_used_col: str, + p2pointcnt_col: str, +) -> pl.DataFrame: + """Exact B&P grouped variance with V1 + V2 per estimation unit. + + Computes V(Y), V(X), and Cov(Y,X) per group and uses the ratio-of-means + formula for per-acre SE: V(R) = (1/X^2) * [V(Y) + R^2*V(X) - 2*R*Cov(Y,X)] + """ + # Group by (group_cols + EU + stratum) to get stratum stats + stratum_group_cols = valid_group_cols + [estn_unit_col, stratum_col] + + agg_exprs = [ + pl.count("PLT_CN").alias("n_h_actual"), + pl.mean(y_col).alias("ybar_h"), + pl.var(y_col, ddof=1).alias("s2_yh"), + pl.first(weight_col).cast(pl.Float64).alias("w_h"), + pl.first(stratum_wgt_col).cast(pl.Float64).alias("W_h"), + pl.first(area_used_col).cast(pl.Float64).alias("A"), + pl.first(p2pointcnt_col).cast(pl.Float64).alias("n_h_design"), + ] + + # Add X-related stats for ratio variance + if x_col in plot_data.columns: + agg_exprs.extend( + [ + pl.mean(x_col).alias("xbar_h"), + pl.var(x_col, ddof=1).alias("s2_xh"), + pl.cov(y_col, x_col, ddof=1).alias("cov_yxh"), + ] ) - se_acre = var_stats["se_total"] / total_x if total_x > 0 else 0.0 - return pl.DataFrame( - { - "se_acre": [se_acre], - "se_total": [var_stats["se_total"]], - "variance_acre": [se_acre**2], - "variance_total": [var_stats["variance_total"]], - } + + strata_stats = plot_data.group_by(stratum_group_cols).agg(agg_exprs) + + # Handle null variances/covariances + fill_exprs = [ + pl.col("s2_yh").fill_null(0.0).cast(pl.Float64).alias("s2_yh"), + pl.col("ybar_h").fill_null(0.0).cast(pl.Float64).alias("ybar_h"), + ] + if x_col in plot_data.columns: + fill_exprs.extend( + [ + pl.col("s2_xh").fill_null(0.0).cast(pl.Float64).alias("s2_xh"), + pl.col("cov_yxh").fill_null(0.0).cast(pl.Float64).alias("cov_yxh"), + pl.col("xbar_h").fill_null(0.0).cast(pl.Float64).alias("xbar_h"), + ] ) + strata_stats = strata_stats.with_columns(fill_exprs) - # Step 1: Calculate stratum-level statistics per group - # Group by (group_cols + stratum) to get stratum stats within each group - stratum_group_cols = valid_group_cols + [stratum_col] + # Calculate n (total plots per EU per group) + eu_group_cols = valid_group_cols + [estn_unit_col] + eu_totals = strata_stats.group_by(eu_group_cols).agg( + pl.sum("n_h_actual").alias("n_eu") + ) + strata_stats = strata_stats.join(eu_totals, on=eu_group_cols, how="left") + + # v_yh, v_xh, c_yxh = s²/n_h for strata with n_h > 1 + v_exprs = [ + pl.when(pl.col("n_h_actual") > 1) + .then(pl.col("s2_yh") / pl.col("n_h_design")) + .otherwise(0.0) + .alias("v_yh"), + ] + if x_col in plot_data.columns: + v_exprs.extend( + [ + pl.when(pl.col("n_h_actual") > 1) + .then(pl.col("s2_xh") / pl.col("n_h_design")) + .otherwise(0.0) + .alias("v_xh"), + pl.when(pl.col("n_h_actual") > 1) + .then(pl.col("cov_yxh") / pl.col("n_h_design")) + .otherwise(0.0) + .alias("c_yxh"), + ] + ) + strata_stats = strata_stats.with_columns(v_exprs) + + # V1 and V2 components for Y, X, and Cov + comp_exprs = [ + (pl.col("W_h") * pl.col("v_yh")).alias("v1_y"), + ((1.0 - pl.col("W_h")) * pl.col("v_yh")).alias("v2_y"), + ] + if x_col in plot_data.columns: + comp_exprs.extend( + [ + (pl.col("W_h") * pl.col("v_xh")).alias("v1_x"), + ((1.0 - pl.col("W_h")) * pl.col("v_xh")).alias("v2_x"), + (pl.col("W_h") * pl.col("c_yxh")).alias("v1_cov"), + ((1.0 - pl.col("W_h")) * pl.col("c_yxh")).alias("v2_cov"), + ] + ) + strata_stats = strata_stats.with_columns(comp_exprs) + + # Aggregate to EU level per group + eu_agg_exprs = [ + pl.sum("v1_y").alias("sum_v1_y"), + pl.sum("v2_y").alias("sum_v2_y"), + pl.first("A").alias("A"), + pl.first("n_eu").alias("n"), + ] + if x_col in plot_data.columns: + eu_agg_exprs.extend( + [ + pl.sum("v1_x").alias("sum_v1_x"), + pl.sum("v2_x").alias("sum_v2_x"), + pl.sum("v1_cov").alias("sum_v1_cov"), + pl.sum("v2_cov").alias("sum_v2_cov"), + ] + ) + + eu_variance = strata_stats.group_by(eu_group_cols).agg(eu_agg_exprs) + + # V_EU = (A²/n) × sum_v1 + (A²/n²) × sum_v2 for Y + eu_v_exprs = [ + ( + (pl.col("A") ** 2 / pl.col("n")) * pl.col("sum_v1_y") + + (pl.col("A") ** 2 / pl.col("n") ** 2) * pl.col("sum_v2_y") + ).alias("V_y_EU"), + ] + if x_col in plot_data.columns: + eu_v_exprs.extend( + [ + ( + (pl.col("A") ** 2 / pl.col("n")) * pl.col("sum_v1_x") + + (pl.col("A") ** 2 / pl.col("n") ** 2) * pl.col("sum_v2_x") + ).alias("V_x_EU"), + ( + (pl.col("A") ** 2 / pl.col("n")) * pl.col("sum_v1_cov") + + (pl.col("A") ** 2 / pl.col("n") ** 2) * pl.col("sum_v2_cov") + ).alias("Cov_EU"), + ] + ) + eu_variance = eu_variance.with_columns(eu_v_exprs) - strata_stats = plot_data.group_by(stratum_group_cols).agg( + # Sum across EUs per group - need total_y and total_x per group too + # Compute total_y and total_x from strata_stats + totals_by_group = strata_stats.group_by(valid_group_cols).agg( [ - pl.count("PLT_CN").alias("n_h"), - pl.mean(y_col).alias("ybar_h"), - pl.var(y_col, ddof=1).alias("s2_yh"), - pl.first(weight_col).cast(pl.Float64).alias("w_h"), - # For per-acre calculation, need total area - (pl.col(weight_col).first() * pl.col(x_col).sum()).alias("stratum_area") - if x_col in plot_data.columns - else pl.lit(0.0).alias("stratum_area"), + (pl.col("ybar_h") * pl.col("w_h") * pl.col("n_h_actual")) + .sum() + .alias("total_y"), ] + + ( + [ + (pl.col("xbar_h") * pl.col("w_h") * pl.col("n_h_actual")) + .sum() + .alias("total_x") + ] + if x_col in plot_data.columns + else [] + ) ) - # Handle null variances (single observation or all same values) - strata_stats = strata_stats.with_columns( + group_agg_exprs = [ + pl.sum("V_y_EU").alias("variance_total"), + ] + if x_col in plot_data.columns: + group_agg_exprs.extend( + [ + pl.sum("V_x_EU").alias("variance_x"), + pl.sum("Cov_EU").alias("covariance"), + ] + ) + + variance_by_group = eu_variance.group_by(valid_group_cols).agg(group_agg_exprs) + + # Join totals + variance_by_group = variance_by_group.join( + totals_by_group, on=valid_group_cols, how="left" + ) + + # Clamp variance_total and compute SE + variance_by_group = variance_by_group.with_columns( [ - pl.when(pl.col("s2_yh").is_null()) + pl.when(pl.col("variance_total") < 0) .then(0.0) - .otherwise(pl.col("s2_yh")) - .cast(pl.Float64) - .alias("s2_yh"), - pl.col("ybar_h").fill_null(0.0).cast(pl.Float64).alias("ybar_h"), + .otherwise(pl.col("variance_total")) + .alias("variance_total"), + ] + ).with_columns( + [ + pl.col("variance_total").sqrt().alias("se_total"), ] ) - # Step 2: Calculate variance component per stratum (only for n_h > 1) - # v_h = w_h² × s²_yh × n_h + # Compute ratio variance for per-acre SE + if x_col in plot_data.columns: + variance_by_group = ( + variance_by_group.with_columns( + [ + pl.when(pl.col("total_x") > 0) + .then(pl.col("total_y") / pl.col("total_x")) + .otherwise(0.0) + .alias("ratio"), + ] + ) + .with_columns( + [ + pl.when(pl.col("total_x") > 0) + .then( + (1.0 / pl.col("total_x") ** 2) + * ( + pl.col("variance_total") + + pl.col("ratio") ** 2 * pl.col("variance_x") + - 2.0 * pl.col("ratio") * pl.col("covariance") + ) + ) + .otherwise(0.0) + .alias("variance_acre"), + ] + ) + .with_columns( + [ + # Clamp to zero if negative (numerical precision) + pl.when(pl.col("variance_acre") < 0) + .then(0.0) + .otherwise(pl.col("variance_acre")) + .alias("variance_acre"), + ] + ) + .with_columns( + [ + pl.col("variance_acre").sqrt().alias("se_acre"), + ] + ) + ) + else: + variance_by_group = variance_by_group.with_columns( + [ + pl.lit(0.0).alias("se_acre"), + pl.lit(0.0).alias("variance_acre"), + ] + ) + + result_cols = valid_group_cols + [ + "se_acre", + "se_total", + "variance_acre", + "variance_total", + ] + return variance_by_group.select(result_cols) + + +def _calculate_grouped_simplified_variance( + plot_data: pl.DataFrame, + valid_group_cols: List[str], + y_col: str, + x_col: str, + stratum_col: str, + weight_col: str, +) -> pl.DataFrame: + """Simplified grouped variance formula with ratio-of-means for per-acre SE. + + V(Y) = Σ_h W_h² × s²_yh × n_h + V(R) = (1/X^2) * Σ_h [w_h^2 * n_h * (s2_y - 2*R*cov_yx + R^2*s2_x)] + """ + stratum_group_cols = valid_group_cols + [stratum_col] + + agg_exprs = [ + pl.count("PLT_CN").alias("n_h"), + pl.mean(y_col).alias("ybar_h"), + pl.var(y_col, ddof=1).alias("s2_yh"), + pl.first(weight_col).cast(pl.Float64).alias("w_h"), + ] + + if x_col in plot_data.columns: + agg_exprs.extend( + [ + pl.mean(x_col).alias("xbar_h"), + pl.var(x_col, ddof=1).alias("s2_xh"), + pl.cov(y_col, x_col, ddof=1).alias("cov_yxh"), + ] + ) + + strata_stats = plot_data.group_by(stratum_group_cols).agg(agg_exprs) + + fill_exprs = [ + pl.col("s2_yh").fill_null(0.0).cast(pl.Float64).alias("s2_yh"), + pl.col("ybar_h").fill_null(0.0).cast(pl.Float64).alias("ybar_h"), + ] + if x_col in plot_data.columns: + fill_exprs.extend( + [ + pl.col("s2_xh").fill_null(0.0).cast(pl.Float64).alias("s2_xh"), + pl.col("cov_yxh").fill_null(0.0).cast(pl.Float64).alias("cov_yxh"), + pl.col("xbar_h").fill_null(0.0).cast(pl.Float64).alias("xbar_h"), + ] + ) + strata_stats = strata_stats.with_columns(fill_exprs) + + # Variance of Y total and totals strata_stats = strata_stats.with_columns( [ pl.when(pl.col("n_h") > 1) .then(pl.col("w_h") ** 2 * pl.col("s2_yh") * pl.col("n_h")) .otherwise(0.0) - .alias("v_h"), - # Total Y contribution from this stratum + .alias("v_y_h"), (pl.col("ybar_h") * pl.col("w_h") * pl.col("n_h")).alias("total_y_h"), ] + + ( + [(pl.col("xbar_h") * pl.col("w_h") * pl.col("n_h")).alias("total_x_h")] + if x_col in plot_data.columns + else [] + ) ) - # Step 3: Aggregate variance components by group - variance_by_group = strata_stats.group_by(valid_group_cols).agg( + group_agg_exprs = [ + pl.sum("v_y_h").alias("variance_total"), + pl.sum("total_y_h").alias("total_y"), + pl.sum("n_h").alias("n_plots"), + ] + if x_col in plot_data.columns: + group_agg_exprs.append(pl.sum("total_x_h").alias("total_x")) + + variance_by_group = strata_stats.group_by(valid_group_cols).agg(group_agg_exprs) + + # Clamp and compute SE total + variance_by_group = variance_by_group.with_columns( [ - pl.sum("v_h").alias("variance_total"), - pl.sum("total_y_h").alias("total_y"), - pl.sum("stratum_area").alias("total_area"), - pl.sum("n_h").alias("n_plots"), + pl.when(pl.col("variance_total") < 0) + .then(0.0) + .otherwise(pl.col("variance_total")) + .alias("variance_total"), + ] + ).with_columns( + [ + pl.col("variance_total").sqrt().alias("se_total"), ] ) - # Step 4: Calculate SE and per-acre variance - variance_by_group = ( - variance_by_group.with_columns( + # Compute ratio variance for per-acre SE + if x_col in plot_data.columns: + # Need to compute ratio per group for the simplified ratio variance formula + variance_by_group = variance_by_group.with_columns( [ - # Clamp negative variances to 0 - pl.when(pl.col("variance_total") < 0) - .then(0.0) - .otherwise(pl.col("variance_total")) - .alias("variance_total"), + pl.when(pl.col("total_x") > 0) + .then(pl.col("total_y") / pl.col("total_x")) + .otherwise(0.0) + .alias("ratio"), ] ) - .with_columns( + + # Compute ratio variance per stratum: w_h^2 * n_h * (s2_y - 2R*cov + R^2*s2_x) + # We need the ratio per group joined back to strata_stats + strata_with_ratio = strata_stats.join( + variance_by_group.select(valid_group_cols + ["ratio", "total_x"]), + on=valid_group_cols, + how="left", + ) + + strata_with_ratio = strata_with_ratio.with_columns( [ - # SE total = sqrt(variance_total) - pl.col("variance_total").sqrt().alias("se_total"), - # SE acre = SE_total / total_area - pl.when(pl.col("total_area") > 0) - .then(pl.col("variance_total").sqrt() / pl.col("total_area")) + pl.when(pl.col("n_h") > 1) + .then( + pl.col("w_h") ** 2 + * pl.col("n_h") + * ( + pl.col("s2_yh") + - 2.0 * pl.col("ratio") * pl.col("cov_yxh") + + pl.col("ratio") ** 2 * pl.col("s2_xh") + ) + ) .otherwise(0.0) - .alias("se_acre"), + .alias("v_ratio_h"), ] ) - .with_columns( + + ratio_var_by_group = strata_with_ratio.group_by(valid_group_cols).agg( [ - # Variance acre = SE_acre² - (pl.col("se_acre") ** 2).alias("variance_acre"), + pl.sum("v_ratio_h").alias("total_ratio_var"), + pl.first("total_x").alias("total_x"), + ] + ) + + ratio_var_by_group = ( + ratio_var_by_group.with_columns( + [ + pl.when(pl.col("total_x") > 0) + .then(pl.col("total_ratio_var") / pl.col("total_x") ** 2) + .otherwise(0.0) + .alias("variance_acre"), + ] + ) + .with_columns( + [ + pl.when(pl.col("variance_acre") < 0) + .then(0.0) + .otherwise(pl.col("variance_acre")) + .alias("variance_acre"), + ] + ) + .with_columns( + [ + pl.col("variance_acre").sqrt().alias("se_acre"), + ] + ) + ) + + variance_by_group = variance_by_group.join( + ratio_var_by_group.select(valid_group_cols + ["variance_acre", "se_acre"]), + on=valid_group_cols, + how="left", + ) + else: + variance_by_group = variance_by_group.with_columns( + [ + pl.lit(0.0).alias("se_acre"), + pl.lit(0.0).alias("variance_acre"), ] ) - ) - # Select only the columns we need result_cols = valid_group_cols + [ "se_acre", "se_total", @@ -212,30 +648,36 @@ def calculate_domain_total_variance( y_col: str, stratum_col: str = "STRATUM_CN", weight_col: str = "EXPNS", + estn_unit_col: str = "ESTN_UNIT_CN", + stratum_wgt_col: str = "STRATUM_WGT", + area_used_col: str = "AREA_USED", + p2pointcnt_col: str = "P2POINTCNT", ) -> Dict[str, float]: """Calculate variance for domain total estimation. - Implements the stratified domain total variance formula from - Bechtold & Patterson (2005), which is used by EVALIDator for - tree-based attributes (volume, biomass, tree count, etc.): + Implements the exact post-stratified variance formula from Bechtold & + Patterson (2005), Section 4.2, pp. 55-60, as used by rFIA's unitVar(). - V(Ŷ) = Σ_h w_h² × s²_yh × n_h + For each estimation unit EU: - Where: - - Y is the attribute of interest (volume, biomass, tree count, etc.) - - s²_yh is the sample variance of Y in stratum h (with ddof=1) - - w_h is the stratum weight (EXPNS = acres/plot in stratum h) - - n_h is the number of plots in stratum h + n = Σ_h n_h (total phase 2 plots in EU) + A = AREA_USED (total area of the estimation unit) - This is the standard FIA variance formula for domain totals, which - does NOT include ratio adjustment terms. EVALIDator uses this formula - for tree-based estimates because the domain total is calculated - directly from plot-level expanded values. + For each stratum h with n_h > 1: + v_h = s²_yh / n_h (variance of stratum mean) - Note: This differs from `calculate_ratio_variance` which includes - covariance terms for ratio-of-means estimation. For tree attributes - where Y already incorporates expansion factors, the simpler domain - total variance is appropriate and matches EVALIDator output. + V1 = (A²/n) × Σ_h W_h × v_h (main within-stratum term) + V2 = (A²/n²) × Σ_h (1 - W_h) × v_h (post-stratification correction) + V_EU = V1 + V2 + + V_total = Σ_EU V_EU + + Where: + - A = AREA_USED (total area of the estimation unit in acres) + - n = total number of phase 2 plots in the estimation unit + - W_h = STRATUM_WGT = P1POINTCNT / P1PNTCNT_EU (phase 1 stratum weight) + - s²_yh = sample variance within stratum h (ddof=1) + - n_h = P2POINTCNT = number of phase 2 plots in stratum h Parameters ---------- @@ -246,12 +688,24 @@ def calculate_domain_total_variance( - y_col: Attribute values (expanded to per-acre or total) - stratum_col: Stratum assignment - weight_col: Expansion factors + - estn_unit_col: Estimation unit identifier + - stratum_wgt_col: Phase 1 stratum weight (W_h) + - area_used_col: Total area of the estimation unit (acres) + - p2pointcnt_col: Number of phase 2 plots in stratum y_col : str Column name for Y values stratum_col : str, default 'STRATUM_CN' Column name for stratum assignment weight_col : str, default 'EXPNS' Column name for stratum weights (expansion factors) + estn_unit_col : str, default 'ESTN_UNIT_CN' + Column name for estimation unit identifier + stratum_wgt_col : str, default 'STRATUM_WGT' + Column name for phase 1 stratum weight + area_used_col : str, default 'AREA_USED' + Column name for total area of estimation unit + p2pointcnt_col : str, default 'P2POINTCNT' + Column name for number of phase 2 plots in stratum Returns ------- @@ -269,6 +723,7 @@ def calculate_domain_total_variance( - Single-plot strata (excluded from variance calculation) - Null variances (treated as 0) - Missing stratification (treated as single stratum) + - Multiple estimation units (variance computed per-EU then summed) References ---------- @@ -280,10 +735,158 @@ def calculate_domain_total_variance( """ # Determine stratification columns if stratum_col not in plot_data.columns: - # No stratification, treat as single stratum plot_data = plot_data.with_columns(pl.lit(1).alias("_STRATUM")) stratum_col = "_STRATUM" + # Check if exact B&P columns are available + has_bp_cols = all( + col in plot_data.columns + for col in [estn_unit_col, stratum_wgt_col, area_used_col, p2pointcnt_col] + ) + + if has_bp_cols: + return _calculate_exact_bp_variance( + plot_data, + y_col, + stratum_col, + weight_col, + estn_unit_col, + stratum_wgt_col, + area_used_col, + p2pointcnt_col, + ) + + # Fallback: simplified formula for backward compatibility (e.g., area estimator) + return _calculate_simplified_variance( + plot_data, + y_col, + stratum_col, + weight_col, + ) + + +def _calculate_exact_bp_variance( + plot_data: pl.DataFrame, + y_col: str, + stratum_col: str, + weight_col: str, + estn_unit_col: str, + stratum_wgt_col: str, + area_used_col: str, + p2pointcnt_col: str, +) -> Dict[str, float]: + """Exact Bechtold & Patterson post-stratified variance (V1 + V2). + + Computes variance per estimation unit then sums across EUs. + """ + # Calculate stratum-level statistics within each EU + strata_stats = plot_data.group_by([estn_unit_col, stratum_col]).agg( + [ + pl.count("PLT_CN").alias("n_h_actual"), + pl.mean(y_col).alias("ybar_h"), + pl.var(y_col, ddof=1).alias("s2_yh"), + pl.first(weight_col).cast(pl.Float64).alias("w_h"), + pl.first(stratum_wgt_col).cast(pl.Float64).alias("W_h"), + pl.first(area_used_col).cast(pl.Float64).alias("A"), + pl.first(p2pointcnt_col).cast(pl.Float64).alias("n_h_design"), + ] + ) + + # Handle null variances (single observation or all same values) + strata_stats = strata_stats.with_columns( + [ + pl.when(pl.col("s2_yh").is_null()) + .then(0.0) + .otherwise(pl.col("s2_yh")) + .cast(pl.Float64) + .alias("s2_yh"), + pl.col("ybar_h").fill_null(0.0).cast(pl.Float64).alias("ybar_h"), + ] + ) + + # Calculate total Y using expansion factors + total_y = ( + strata_stats["ybar_h"] * strata_stats["w_h"] * strata_stats["n_h_actual"] + ).sum() + + n_strata = len(strata_stats) + n_plots = int(strata_stats["n_h_actual"].sum()) + + # Calculate n (total plots per EU) for exact formula + eu_totals = strata_stats.group_by(estn_unit_col).agg( + pl.sum("n_h_actual").alias("n_eu") + ) + strata_stats = strata_stats.join(eu_totals, on=estn_unit_col, how="left") + + # B&P post-stratified variance uses s²_h directly (NOT s²_h/n_h). + # The formula V(ȳ_ps) = (1/n)Σ W_h s²_h + (1/n²)Σ (1-W_h) s²_h + # already accounts for sample size through the A²/n and A²/n² terms. + strata_stats = strata_stats.with_columns( + [ + pl.when(pl.col("n_h_actual") > 1) + .then(pl.col("s2_yh")) + .otherwise(0.0) + .alias("v_h"), + ] + ) + + # Calculate V1 and V2 components per stratum + # V1_component = W_h × v_h + # V2_component = (1 - W_h) × v_h + strata_stats = strata_stats.with_columns( + [ + (pl.col("W_h") * pl.col("v_h")).alias("v1_component"), + ((1.0 - pl.col("W_h")) * pl.col("v_h")).alias("v2_component"), + ] + ) + + # Aggregate to EU level + eu_variance = strata_stats.group_by(estn_unit_col).agg( + [ + pl.sum("v1_component").alias("sum_v1"), + pl.sum("v2_component").alias("sum_v2"), + pl.first("A").alias("A"), + pl.first("n_eu").alias("n"), + ] + ) + + # Calculate V_EU = (A²/n) × sum_v1 + (A²/n²) × sum_v2 + eu_variance = eu_variance.with_columns( + [ + ( + (pl.col("A") ** 2 / pl.col("n")) * pl.col("sum_v1") + + (pl.col("A") ** 2 / pl.col("n") ** 2) * pl.col("sum_v2") + ).alias("V_EU"), + ] + ) + + # Sum across estimation units + variance_total = eu_variance["V_EU"].drop_nans().sum() + if variance_total is None or variance_total < 0: + variance_total = 0.0 + + se_total = variance_total**0.5 + + return { + "variance_total": variance_total, + "se_total": se_total, + "total_y": total_y, + "n_strata": n_strata, + "n_plots": n_plots, + } + + +def _calculate_simplified_variance( + plot_data: pl.DataFrame, + y_col: str, + stratum_col: str, + weight_col: str, +) -> Dict[str, float]: + """Simplified variance formula V = Σ_h W_h² × s²_yh × n_h. + + Used as fallback when exact B&P columns are not available + (e.g., area estimator which has its own variance path). + """ # Calculate stratum-level statistics strata_stats = plot_data.group_by(stratum_col).agg( [ @@ -306,25 +909,21 @@ def calculate_domain_total_variance( ] ) - # Calculate population total using expansion factors - # Total Y = Σ_h (ybar_h × w_h × n_h) + # Calculate population total total_y = (strata_stats["ybar_h"] * strata_stats["w_h"] * strata_stats["n_h"]).sum() - # Filter out single-plot strata (variance undefined with n=1) + # Filter out single-plot strata strata_with_variance = strata_stats.filter(pl.col("n_h") > 1) - # Calculate variance components only for strata with n > 1 # V(Ŷ) = Σ_h w_h² × s²_yh × n_h variance_components = strata_with_variance.with_columns( [(pl.col("w_h") ** 2 * pl.col("s2_yh") * pl.col("n_h")).alias("v_h")] ) - # Sum variance components, handling NaN values variance_total = variance_components["v_h"].drop_nans().sum() if variance_total is None or variance_total < 0: variance_total = 0.0 - # Standard error se_total = variance_total**0.5 return { @@ -336,6 +935,349 @@ def calculate_domain_total_variance( } +def calculate_ratio_of_means_variance( + plot_data: pl.DataFrame, + y_col: str, + x_col: str = "x_i", + stratum_col: str = "STRATUM_CN", + weight_col: str = "EXPNS", + estn_unit_col: str = "ESTN_UNIT_CN", + stratum_wgt_col: str = "STRATUM_WGT", + area_used_col: str = "AREA_USED", + p2pointcnt_col: str = "P2POINTCNT", +) -> Dict[str, float]: + """Calculate variance for ratio-of-means estimation (per-acre estimates). + + Implements the ratio-of-means variance formula from Bechtold & Patterson + (2005), Section 4.2: + + V(R) = (1/X^2) * [V(Y) + R^2 * V(X) - 2*R*Cov(Y,X)] + + Where R = Y/X is the ratio (per-acre estimate), Y is the tree attribute + total, X is the total area, and V(Y), V(X), Cov(Y,X) are computed using + the exact post-stratified B&P formula or simplified fallback. + + Since Y and X are estimated from the same sample plots, their covariance + is typically positive (more forest area -> more volume), which means + the ratio variance is typically less than se_total^2 / total_x^2. + + Parameters + ---------- + plot_data : pl.DataFrame + Plot-level data with y_col and x_col values, plus stratification. + y_col : str + Column name for Y values (tree attribute per plot) + x_col : str, default 'x_i' + Column name for X values (area proportion per plot) + stratum_col : str, default 'STRATUM_CN' + Column name for stratum assignment + weight_col : str, default 'EXPNS' + Column name for stratum weights (expansion factors) + estn_unit_col : str, default 'ESTN_UNIT_CN' + Column name for estimation unit identifier + stratum_wgt_col : str, default 'STRATUM_WGT' + Column name for phase 1 stratum weight + area_used_col : str, default 'AREA_USED' + Column name for total area of estimation unit + p2pointcnt_col : str, default 'P2POINTCNT' + Column name for number of phase 2 plots in stratum + + Returns + ------- + dict + Dictionary with keys: + - variance_total: Variance of total Y estimate V(Y) + - se_total: Standard error of total Y estimate + - variance_ratio: Variance of ratio estimate V(R) + - se_ratio: Standard error of ratio estimate + - total_y: Estimated total Y + - total_x: Estimated total X (area) + - ratio: Estimated ratio R = Y/X + """ + if stratum_col not in plot_data.columns: + plot_data = plot_data.with_columns(pl.lit(1).alias("_STRATUM")) + stratum_col = "_STRATUM" + + has_bp_cols = all( + col in plot_data.columns + for col in [estn_unit_col, stratum_wgt_col, area_used_col, p2pointcnt_col] + ) + + if has_bp_cols: + return _calculate_exact_bp_ratio_variance( + plot_data, + y_col, + x_col, + stratum_col, + weight_col, + estn_unit_col, + stratum_wgt_col, + area_used_col, + p2pointcnt_col, + ) + + return _calculate_simplified_ratio_variance( + plot_data, + y_col, + x_col, + stratum_col, + weight_col, + ) + + +def _calculate_exact_bp_ratio_variance( + plot_data: pl.DataFrame, + y_col: str, + x_col: str, + stratum_col: str, + weight_col: str, + estn_unit_col: str, + stratum_wgt_col: str, + area_used_col: str, + p2pointcnt_col: str, +) -> Dict[str, float]: + """Exact B&P ratio-of-means variance with V1+V2 per estimation unit. + + Computes V(Y), V(X), and Cov(Y,X) using the exact post-stratified + formula, then applies the ratio variance formula: + + V(R) = (1/X^2) * [V(Y) + R^2 * V(X) - 2*R*Cov(Y,X)] + """ + # Stratum-level statistics for Y, X, and their covariance + strata_stats = plot_data.group_by([estn_unit_col, stratum_col]).agg( + [ + pl.count("PLT_CN").alias("n_h_actual"), + pl.mean(y_col).alias("ybar_h"), + pl.mean(x_col).alias("xbar_h"), + pl.var(y_col, ddof=1).alias("s2_yh"), + pl.var(x_col, ddof=1).alias("s2_xh"), + pl.cov(y_col, x_col, ddof=1).alias("cov_yxh"), + pl.first(weight_col).cast(pl.Float64).alias("w_h"), + pl.first(stratum_wgt_col).cast(pl.Float64).alias("W_h"), + pl.first(area_used_col).cast(pl.Float64).alias("A"), + pl.first(p2pointcnt_col).cast(pl.Float64).alias("n_h_design"), + ] + ) + + # Handle null variances/covariances + strata_stats = strata_stats.with_columns( + [ + pl.col("s2_yh").fill_null(0.0).cast(pl.Float64).alias("s2_yh"), + pl.col("s2_xh").fill_null(0.0).cast(pl.Float64).alias("s2_xh"), + pl.col("cov_yxh").fill_null(0.0).cast(pl.Float64).alias("cov_yxh"), + pl.col("ybar_h").fill_null(0.0).cast(pl.Float64).alias("ybar_h"), + pl.col("xbar_h").fill_null(0.0).cast(pl.Float64).alias("xbar_h"), + ] + ) + + # Total Y and X using expansion factors + total_y = ( + strata_stats["ybar_h"] * strata_stats["w_h"] * strata_stats["n_h_actual"] + ).sum() + total_x = ( + strata_stats["xbar_h"] * strata_stats["w_h"] * strata_stats["n_h_actual"] + ).sum() + + # n per EU + eu_totals = strata_stats.group_by(estn_unit_col).agg( + pl.sum("n_h_actual").alias("n_eu") + ) + strata_stats = strata_stats.join(eu_totals, on=estn_unit_col, how="left") + + # B&P post-stratified variance uses s²_h directly (NOT s²_h/n_h). + # The formula V(ȳ_ps) = (1/n)Σ W_h s²_h + (1/n²)Σ (1-W_h) s²_h + # already accounts for sample size through the A²/n and A²/n² terms. + strata_stats = strata_stats.with_columns( + [ + pl.when(pl.col("n_h_actual") > 1) + .then(pl.col("s2_yh")) + .otherwise(0.0) + .alias("v_yh"), + pl.when(pl.col("n_h_actual") > 1) + .then(pl.col("s2_xh")) + .otherwise(0.0) + .alias("v_xh"), + pl.when(pl.col("n_h_actual") > 1) + .then(pl.col("cov_yxh")) + .otherwise(0.0) + .alias("c_yxh"), + ] + ) + + # V1 and V2 components for Y, X, and Cov + strata_stats = strata_stats.with_columns( + [ + # Y variance components + (pl.col("W_h") * pl.col("v_yh")).alias("v1_y"), + ((1.0 - pl.col("W_h")) * pl.col("v_yh")).alias("v2_y"), + # X variance components + (pl.col("W_h") * pl.col("v_xh")).alias("v1_x"), + ((1.0 - pl.col("W_h")) * pl.col("v_xh")).alias("v2_x"), + # Covariance components + (pl.col("W_h") * pl.col("c_yxh")).alias("v1_cov"), + ((1.0 - pl.col("W_h")) * pl.col("c_yxh")).alias("v2_cov"), + ] + ) + + # Aggregate to EU level + eu_variance = strata_stats.group_by(estn_unit_col).agg( + [ + pl.sum("v1_y").alias("sum_v1_y"), + pl.sum("v2_y").alias("sum_v2_y"), + pl.sum("v1_x").alias("sum_v1_x"), + pl.sum("v2_x").alias("sum_v2_x"), + pl.sum("v1_cov").alias("sum_v1_cov"), + pl.sum("v2_cov").alias("sum_v2_cov"), + pl.first("A").alias("A"), + pl.first("n_eu").alias("n"), + ] + ) + + # V_EU = (A²/n) * sum_v1 + (A²/n²) * sum_v2 for Y, X, and Cov + eu_variance = eu_variance.with_columns( + [ + ( + (pl.col("A") ** 2 / pl.col("n")) * pl.col("sum_v1_y") + + (pl.col("A") ** 2 / pl.col("n") ** 2) * pl.col("sum_v2_y") + ).alias("V_y_EU"), + ( + (pl.col("A") ** 2 / pl.col("n")) * pl.col("sum_v1_x") + + (pl.col("A") ** 2 / pl.col("n") ** 2) * pl.col("sum_v2_x") + ).alias("V_x_EU"), + ( + (pl.col("A") ** 2 / pl.col("n")) * pl.col("sum_v1_cov") + + (pl.col("A") ** 2 / pl.col("n") ** 2) * pl.col("sum_v2_cov") + ).alias("Cov_EU"), + ] + ) + + # Sum across EUs + variance_y = eu_variance["V_y_EU"].drop_nans().sum() + variance_x = eu_variance["V_x_EU"].drop_nans().sum() + covariance = eu_variance["Cov_EU"].drop_nans().sum() + + if variance_y is None or variance_y < 0: + variance_y = 0.0 + + se_total = variance_y**0.5 + + # Ratio variance: V(R) = (1/X^2) * [V(Y) + R^2*V(X) - 2*R*Cov(Y,X)] + if total_x is not None and total_x > 0: + ratio = total_y / total_x + variance_ratio = (1.0 / total_x**2) * ( + variance_y + ratio**2 * variance_x - 2.0 * ratio * covariance + ) + # Clamp to zero if negative (numerical precision) + if variance_ratio < 0: + variance_ratio = 0.0 + se_ratio = variance_ratio**0.5 + else: + ratio = 0.0 + variance_ratio = 0.0 + se_ratio = 0.0 + + return { + "variance_total": variance_y, + "se_total": se_total, + "variance_ratio": variance_ratio, + "se_ratio": se_ratio, + "total_y": total_y, + "total_x": total_x, + "ratio": ratio, + } + + +def _calculate_simplified_ratio_variance( + plot_data: pl.DataFrame, + y_col: str, + x_col: str, + stratum_col: str, + weight_col: str, +) -> Dict[str, float]: + """Simplified ratio-of-means variance (no B&P columns). + + Uses the per-stratum formula: + V(R) = (1/X^2) * Σ_h [w_h^2 * n_h * (s2_y - 2*R*cov_yx + R^2*s2_x)] + """ + strata_stats = plot_data.group_by(stratum_col).agg( + [ + pl.count("PLT_CN").alias("n_h"), + pl.mean(y_col).alias("ybar_h"), + pl.mean(x_col).alias("xbar_h"), + pl.var(y_col, ddof=1).alias("s2_y"), + pl.var(x_col, ddof=1).alias("s2_x"), + pl.cov(y_col, x_col, ddof=1).alias("cov_yx"), + pl.first(weight_col).cast(pl.Float64).alias("w_h"), + ] + ) + + strata_stats = strata_stats.with_columns( + [ + pl.col("s2_y").fill_null(0.0).cast(pl.Float64).alias("s2_y"), + pl.col("s2_x").fill_null(0.0).cast(pl.Float64).alias("s2_x"), + pl.col("cov_yx").fill_null(0.0).cast(pl.Float64).alias("cov_yx"), + pl.col("ybar_h").fill_null(0.0).cast(pl.Float64).alias("ybar_h"), + pl.col("xbar_h").fill_null(0.0).cast(pl.Float64).alias("xbar_h"), + ] + ) + + # Total Y and X + total_y = (strata_stats["ybar_h"] * strata_stats["w_h"] * strata_stats["n_h"]).sum() + total_x = (strata_stats["xbar_h"] * strata_stats["w_h"] * strata_stats["n_h"]).sum() + + # Variance of Y total (simplified formula) + strata_with_variance = strata_stats.filter(pl.col("n_h") > 1) + variance_y_components = strata_with_variance.with_columns( + [(pl.col("w_h") ** 2 * pl.col("s2_y") * pl.col("n_h")).alias("v_y_h")] + ) + variance_y = variance_y_components["v_y_h"].drop_nans().sum() + if variance_y is None or variance_y < 0: + variance_y = 0.0 + + se_total = variance_y**0.5 + + # Ratio variance using covariance + if total_x is not None and total_x > 0: + ratio = total_y / total_x + + # V(total_ratio) = Σ_h [w_h^2 * n_h * (s2_y - 2*R*cov_yx + R^2*s2_x)] + ratio_var_components = strata_with_variance.with_columns( + [ + ( + pl.col("w_h") ** 2 + * pl.col("n_h") + * ( + pl.col("s2_y") + - 2.0 * ratio * pl.col("cov_yx") + + ratio**2 * pl.col("s2_x") + ) + ).alias("v_ratio_h") + ] + ) + total_ratio_variance = ratio_var_components["v_ratio_h"].drop_nans().sum() + if total_ratio_variance is None or total_ratio_variance < 0: + total_ratio_variance = 0.0 + + variance_ratio = total_ratio_variance / total_x**2 + if variance_ratio < 0: + variance_ratio = 0.0 + se_ratio = variance_ratio**0.5 + else: + ratio = 0.0 + variance_ratio = 0.0 + se_ratio = 0.0 + + return { + "variance_total": variance_y, + "se_total": se_total, + "variance_ratio": variance_ratio, + "se_ratio": se_ratio, + "total_y": total_y, + "total_x": total_x, + "ratio": ratio, + } + + # ============================================================================= # Utility functions salvaged from statistics.py # ============================================================================= diff --git a/tests/unit/test_variance_formulas.py b/tests/unit/test_variance_formulas.py index 8df512f..0152223 100644 --- a/tests/unit/test_variance_formulas.py +++ b/tests/unit/test_variance_formulas.py @@ -1172,3 +1172,698 @@ def test_stratified_sampling_additivity(self, mock_fia_database): f"Additivity violation: V1={v1}, V2={v2}, expected total={expected_total}, " f"got {var_stats['variance']}" ) + + +# ============================================================================= +# TestExactBPVarianceFormula +# ============================================================================= + + +class TestExactBPVarianceFormula: + """ + Test the exact Bechtold & Patterson (2005) post-stratified variance formula. + + The exact formula is: + V_EU = (A²/n) × Σ_h W_h × s²_yh + (A²/n²) × Σ_h (1-W_h) × s²_yh + = V1 + V2 + + Where V2 is the post-stratification correction term that captures + uncertainty from estimating stratum weights from the sample. + Note: s²_yh is used directly (NOT s²_yh/n_h). The sample size is + accounted for in the A²/n and A²/n² terms. + + These tests directly call calculate_domain_total_variance with all B&P + columns present to exercise the exact formula path. + """ + + def test_single_eu_single_stratum_hand_calculated(self): + """ + Hand-calculated exact B&P variance for a single EU with one stratum. + + Setup: + - 1 EU with AREA_USED = 10000 acres + - 1 stratum: W_h = 1.0 (all P1 points in this stratum) + - n_h = 4 plots + - y values = [10, 20, 30, 40] + - s² = var([10,20,30,40], ddof=1) = 166.6667 + - n = 4 (total plots in EU) + + B&P formula uses s² directly (NOT s²/n_h): + V1 = (A²/n) × W_h × s² + = (10000²/4) × 1.0 × 166.6667 + = 25000000 × 166.6667 + = 4,166,666,750 + + V2 = (A²/n²) × (1 - W_h) × s² + = 0 (since W_h = 1.0) + + V_total = V1 + V2 = 4,166,666,750 + """ + from pyfia.estimation.variance import calculate_domain_total_variance + + plot_data = pl.DataFrame( + { + "PLT_CN": ["P1", "P2", "P3", "P4"], + "STRATUM_CN": [1, 1, 1, 1], + "EXPNS": [2500.0, 2500.0, 2500.0, 2500.0], + "ESTN_UNIT_CN": [100, 100, 100, 100], + "STRATUM_WGT": [1.0, 1.0, 1.0, 1.0], + "AREA_USED": [10000.0, 10000.0, 10000.0, 10000.0], + "P2POINTCNT": [4.0, 4.0, 4.0, 4.0], + "y_i": [10.0, 20.0, 30.0, 40.0], + } + ) + + result = calculate_domain_total_variance(plot_data, "y_i") + + # Hand calculation: B&P uses s² directly in V1/V2 + s2 = np.var([10, 20, 30, 40], ddof=1) # 166.6667 + A = 10000.0 + n = 4 + W_h = 1.0 + + v1 = (A**2 / n) * W_h * s2 + v2 = (A**2 / n**2) * (1 - W_h) * s2 + expected_variance = v1 + v2 + + assert abs(result["variance_total"] - expected_variance) < 1.0, ( + f"Expected {expected_variance}, got {result['variance_total']}" + ) + assert abs(result["se_total"] - expected_variance**0.5) < 0.01 + + def test_two_strata_proportional_allocation(self): + """ + With proportional allocation (W_h = n_h/n), V2 should be zero, + and the result should match the simplified formula. + + Setup: + - 1 EU, AREA_USED = 10000 + - Stratum A: W_h = 0.6, n_h = 3, y = [10, 20, 30] + - Stratum B: W_h = 0.4, n_h = 2, y = [50, 60] + - n = 5, proportional: W_A = 3/5 = 0.6, W_B = 2/5 = 0.4 + + When W_h = n_h/n exactly, V2 terms cancel out: + (1 - W_h) = (1 - n_h/n) = (n - n_h)/n + """ + from pyfia.estimation.variance import calculate_domain_total_variance + + plot_data = pl.DataFrame( + { + "PLT_CN": ["P1", "P2", "P3", "P4", "P5"], + "STRATUM_CN": [1, 1, 1, 2, 2], + "EXPNS": [2000.0, 2000.0, 2000.0, 2000.0, 2000.0], + "ESTN_UNIT_CN": [100, 100, 100, 100, 100], + "STRATUM_WGT": [0.6, 0.6, 0.6, 0.4, 0.4], + "AREA_USED": [10000.0] * 5, + "P2POINTCNT": [3.0, 3.0, 3.0, 2.0, 2.0], + "y_i": [10.0, 20.0, 30.0, 50.0, 60.0], + } + ) + + result = calculate_domain_total_variance(plot_data, "y_i") + + # Hand calculation: B&P uses s² directly (NOT s²/n_h) + A = 10000.0 + n = 5 + + s2_a = np.var([10, 20, 30], ddof=1) # 100 + s2_b = np.var([50, 60], ddof=1) # 50 + + v1 = (A**2 / n) * (0.6 * s2_a + 0.4 * s2_b) + v2 = (A**2 / n**2) * ((1 - 0.6) * s2_a + (1 - 0.4) * s2_b) + expected = v1 + v2 + + assert abs(result["variance_total"] - expected) < 1.0, ( + f"Expected {expected}, got {result['variance_total']}" + ) + + def test_non_proportional_allocation_v2_positive(self): + """ + With non-proportional allocation, V2 should be > 0. + + Setup: + - 1 EU, AREA_USED = 10000 + - Stratum A: W_h = 0.9 (many P1 points), n_h = 2 (few P2 plots) + - Stratum B: W_h = 0.1 (few P1 points), n_h = 8 (many P2 plots) + - n = 10 + + Non-proportional because: + W_A = 0.9 but n_A/n = 2/10 = 0.2 + W_B = 0.1 but n_B/n = 8/10 = 0.8 + """ + from pyfia.estimation.variance import calculate_domain_total_variance + + np.random.seed(42) + y_a = np.random.uniform(10, 30, 2).tolist() + y_b = np.random.uniform(40, 60, 8).tolist() + + plot_data = pl.DataFrame( + { + "PLT_CN": [f"P{i}" for i in range(10)], + "STRATUM_CN": [1, 1] + [2] * 8, + "EXPNS": [5000.0] * 2 + [125.0] * 8, + "ESTN_UNIT_CN": [100] * 10, + "STRATUM_WGT": [0.9] * 2 + [0.1] * 8, + "AREA_USED": [10000.0] * 10, + "P2POINTCNT": [2.0] * 2 + [8.0] * 8, + "y_i": y_a + y_b, + } + ) + + result = calculate_domain_total_variance(plot_data, "y_i") + + # Calculate V2 manually to verify it's positive + # B&P uses s² directly (NOT s²/n_h) + A = 10000.0 + n = 10 + s2_a = np.var(y_a, ddof=1) + s2_b = np.var(y_b, ddof=1) + + v2 = (A**2 / n**2) * ((1 - 0.9) * s2_a + (1 - 0.1) * s2_b) + assert v2 > 0, "V2 should be positive for non-proportional allocation" + + v1 = (A**2 / n) * (0.9 * s2_a + 0.1 * s2_b) + expected = v1 + v2 + + assert abs(result["variance_total"] - expected) < 1.0, ( + f"Expected {expected}, got {result['variance_total']}" + ) + assert result["variance_total"] > 0 + + def test_multiple_estimation_units(self): + """ + Test variance aggregation across multiple estimation units. + + Setup: + - EU 1: AREA_USED = 8000, 1 stratum, W_h = 1.0, n_h = 3 + - EU 2: AREA_USED = 12000, 1 stratum, W_h = 1.0, n_h = 2 + - V_total = V_EU1 + V_EU2 + """ + from pyfia.estimation.variance import calculate_domain_total_variance + + plot_data = pl.DataFrame( + { + "PLT_CN": ["P1", "P2", "P3", "P4", "P5"], + "STRATUM_CN": [1, 1, 1, 2, 2], + "EXPNS": [2666.67, 2666.67, 2666.67, 6000.0, 6000.0], + "ESTN_UNIT_CN": [100, 100, 100, 200, 200], + "STRATUM_WGT": [1.0, 1.0, 1.0, 1.0, 1.0], + "AREA_USED": [8000.0, 8000.0, 8000.0, 12000.0, 12000.0], + "P2POINTCNT": [3.0, 3.0, 3.0, 2.0, 2.0], + "y_i": [10.0, 20.0, 30.0, 50.0, 70.0], + } + ) + + result = calculate_domain_total_variance(plot_data, "y_i") + + # EU 1: A=8000, n=3, W_h=1.0 — B&P uses s² directly + s2_1 = np.var([10, 20, 30], ddof=1) # 100 + v_eu1 = (8000**2 / 3) * 1.0 * s2_1 # V1 only (V2=0 since W=1) + + # EU 2: A=12000, n=2, W_h=1.0 + s2_2 = np.var([50, 70], ddof=1) # 200 + v_eu2 = (12000**2 / 2) * 1.0 * s2_2 # V1 only + + expected = v_eu1 + v_eu2 + + assert abs(result["variance_total"] - expected) < 1.0, ( + f"Expected {expected}, got {result['variance_total']}" + ) + + def test_single_plot_stratum_excluded(self): + """ + Strata with n_h = 1 should contribute 0 variance (s² undefined). + """ + from pyfia.estimation.variance import calculate_domain_total_variance + + plot_data = pl.DataFrame( + { + "PLT_CN": ["P1", "P2", "P3", "P4"], + "STRATUM_CN": [1, 1, 1, 2], # Stratum 2 has only 1 plot + "EXPNS": [2500.0, 2500.0, 2500.0, 2500.0], + "ESTN_UNIT_CN": [100, 100, 100, 100], + "STRATUM_WGT": [0.75, 0.75, 0.75, 0.25], + "AREA_USED": [10000.0] * 4, + "P2POINTCNT": [3.0, 3.0, 3.0, 1.0], + "y_i": [10.0, 20.0, 30.0, 100.0], + } + ) + + result = calculate_domain_total_variance(plot_data, "y_i") + + # Only stratum 1 contributes (stratum 2 has n_h=1) + # B&P uses s² directly (NOT s²/n_h) + A = 10000.0 + n = 4 + s2_1 = np.var([10, 20, 30], ddof=1) # 100 + # Stratum 2 s2 = 0 (excluded, n_h=1) + + v1 = (A**2 / n) * (0.75 * s2_1 + 0.25 * 0) + v2 = (A**2 / n**2) * ((1 - 0.75) * s2_1 + (1 - 0.25) * 0) + expected = v1 + v2 + + assert abs(result["variance_total"] - expected) < 1.0, ( + f"Expected {expected}, got {result['variance_total']}" + ) + + def test_fallback_without_bp_columns(self): + """ + When B&P columns are absent, should fall back to simplified formula. + """ + from pyfia.estimation.variance import calculate_domain_total_variance + + plot_data = pl.DataFrame( + { + "PLT_CN": ["P1", "P2", "P3", "P4"], + "STRATUM_CN": [1, 1, 1, 1], + "EXPNS": [1000.0, 1000.0, 1000.0, 1000.0], + "y_i": [0.8, 1.0, 0.6, 0.9], + } + ) + + result = calculate_domain_total_variance(plot_data, "y_i") + + # Simplified formula: V = w² × s² × n + s2 = np.var([0.8, 1.0, 0.6, 0.9], ddof=1) + expected = (1000.0**2) * s2 * 4 + + assert abs(result["variance_total"] - expected) < 1e-6 + + def test_grouped_exact_bp_variance(self): + """ + Test grouped variance with exact B&P formula. + + Two species groups in one EU, each with one stratum. + """ + from pyfia.estimation.variance import ( + calculate_grouped_domain_total_variance, + ) + + plot_data = pl.DataFrame( + { + "PLT_CN": ["P1", "P2", "P3", "P4", "P5", "P6"], + "STRATUM_CN": [1, 1, 1, 1, 1, 1], + "EXPNS": [2000.0] * 6, + "ESTN_UNIT_CN": [100] * 6, + "STRATUM_WGT": [1.0] * 6, + "AREA_USED": [12000.0] * 6, + "P2POINTCNT": [6.0] * 6, + "y_i": [10.0, 20.0, 30.0, 5.0, 15.0, 25.0], + "SPCD": [100, 100, 100, 200, 200, 200], + "x_i": [1.0] * 6, + } + ) + + result = calculate_grouped_domain_total_variance( + plot_data, + group_cols=["SPCD"], + y_col="y_i", + x_col="x_i", + ) + + assert len(result) == 2 + assert "variance_total" in result.columns + assert "se_total" in result.columns + + # Each group should have positive variance + for row in result.iter_rows(named=True): + assert row["variance_total"] > 0 + assert row["se_total"] > 0 + + @pytest.mark.parametrize("seed", range(5)) + def test_exact_bp_variance_non_negative(self, seed): + """Property test: exact B&P variance must always be non-negative.""" + from pyfia.estimation.variance import calculate_domain_total_variance + + np.random.seed(seed) + n_plots = np.random.randint(2, 15) + n_strata = np.random.randint(1, min(4, n_plots)) + + # Assign plots to strata + strata = np.sort(np.random.choice(range(1, n_strata + 1), size=n_plots)) + stratum_weights = np.random.dirichlet(np.ones(n_strata)) + area_used = np.random.uniform(5000, 50000) + + rows = [] + for i in range(n_plots): + s = strata[i] + n_h = np.sum(strata == s) + rows.append( + { + "PLT_CN": f"P{i}", + "STRATUM_CN": int(s), + "EXPNS": area_used / n_plots, + "ESTN_UNIT_CN": 100, + "STRATUM_WGT": float(stratum_weights[s - 1]), + "AREA_USED": area_used, + "P2POINTCNT": float(n_h), + "y_i": float(np.random.uniform(0, 100)), + } + ) + + plot_data = pl.DataFrame(rows) + result = calculate_domain_total_variance(plot_data, "y_i") + + assert result["variance_total"] >= 0, ( + f"Variance was negative: {result['variance_total']}" + ) + assert result["se_total"] >= 0 + + +# ============================================================================= +# TestRatioOfMeansVariance +# ============================================================================= + + +class TestRatioOfMeansVariance: + """ + Test the ratio-of-means variance formula from Bechtold & Patterson (2005): + + V(R) = (1/X^2) * [V(Y) + R^2 * V(X) - 2*R*Cov(Y,X)] + + Where R = Y/X is the per-acre estimate. + """ + + def test_ratio_variance_hand_calculated(self): + """ + Hand-calculated ratio variance for a single EU, single stratum. + + Setup: + - 4 plots with y = [10, 20, 30, 40], x = [0.8, 0.9, 1.0, 0.7] + - Single stratum, W_h = 1.0, A = 10000, n = 4, P2POINTCNT = 4 + + B&P post-stratified variance uses s² directly (NOT s²/n_h): + V(Y) = (A²/n) × W_h × s²_y = (1e8/4) × 166.6667 = 4,166,666,750 + V(X) = (A²/n) × W_h × s²_x = (1e8/4) × 0.016667 = 416,666.67 + Cov = (A²/n) × W_h × cov_yx = (1e8/4) × 1.0 = 25,000,000 + """ + from pyfia.estimation.variance import calculate_ratio_of_means_variance + + plot_data = pl.DataFrame( + { + "PLT_CN": ["P1", "P2", "P3", "P4"], + "STRATUM_CN": [1, 1, 1, 1], + "EXPNS": [2500.0, 2500.0, 2500.0, 2500.0], + "ESTN_UNIT_CN": [100, 100, 100, 100], + "STRATUM_WGT": [1.0, 1.0, 1.0, 1.0], + "AREA_USED": [10000.0, 10000.0, 10000.0, 10000.0], + "P2POINTCNT": [4.0, 4.0, 4.0, 4.0], + "y_i": [10.0, 20.0, 30.0, 40.0], + "x_i": [0.8, 0.9, 1.0, 0.7], + } + ) + + result = calculate_ratio_of_means_variance(plot_data, "y_i", "x_i") + + # Verify all expected keys are present + assert "variance_total" in result + assert "se_total" in result + assert "variance_ratio" in result + assert "se_ratio" in result + assert "total_y" in result + assert "total_x" in result + assert "ratio" in result + + # Verify totals + y_vals = np.array([10, 20, 30, 40]) + x_vals = np.array([0.8, 0.9, 1.0, 0.7]) + expns = 2500.0 + + expected_total_y = expns * y_vals.sum() + expected_total_x = expns * x_vals.sum() + assert abs(result["total_y"] - expected_total_y) < 1.0 + assert abs(result["total_x"] - expected_total_x) < 1.0 + + # Verify ratio + expected_ratio = expected_total_y / expected_total_x + assert abs(result["ratio"] - expected_ratio) < 0.01 + + # Verify variance is positive and ratio variance is computed + assert result["variance_total"] > 0 + assert result["variance_ratio"] > 0 + assert result["se_ratio"] > 0 + + # Hand-calculate the ratio variance + # B&P uses s² directly (NOT s²/n_h) + s2_y = np.var(y_vals, ddof=1) + s2_x = np.var(x_vals, ddof=1) + cov_yx = np.cov(y_vals, x_vals, ddof=1)[0, 1] + area = 10000.0 + n = 4 + + var_y = (area**2 / n) * s2_y + var_x = (area**2 / n) * s2_x + cov_total = (area**2 / n) * cov_yx + + r_hat = expected_ratio + expected_var_ratio = (1.0 / expected_total_x**2) * ( + var_y + r_hat**2 * var_x - 2 * r_hat * cov_total + ) + + assert abs(result["variance_ratio"] - expected_var_ratio) < 0.1, ( + f"Expected V(R)={expected_var_ratio}, got {result['variance_ratio']}" + ) + + def test_constant_x_equals_simple_formula(self): + """ + When all x_i = constant c, V(X)=0 and Cov(Y,X)=0, + so V(R) = V(Y) / X^2, which is the old simplified formula. + """ + from pyfia.estimation.variance import ( + calculate_domain_total_variance, + calculate_ratio_of_means_variance, + ) + + plot_data = pl.DataFrame( + { + "PLT_CN": ["P1", "P2", "P3", "P4"], + "STRATUM_CN": [1, 1, 1, 1], + "EXPNS": [2500.0, 2500.0, 2500.0, 2500.0], + "ESTN_UNIT_CN": [100, 100, 100, 100], + "STRATUM_WGT": [1.0, 1.0, 1.0, 1.0], + "AREA_USED": [10000.0, 10000.0, 10000.0, 10000.0], + "P2POINTCNT": [4.0, 4.0, 4.0, 4.0], + "y_i": [10.0, 20.0, 30.0, 40.0], + "x_i": [1.0, 1.0, 1.0, 1.0], # Constant x + } + ) + + ratio_result = calculate_ratio_of_means_variance(plot_data, "y_i", "x_i") + domain_result = calculate_domain_total_variance(plot_data, "y_i") + + # With constant x, V(X)=0 and Cov=0 + # So V(R) = V(Y) / X^2 + total_x = ratio_result["total_x"] + expected_var_ratio = domain_result["variance_total"] / total_x**2 + expected_se_ratio = domain_result["se_total"] / total_x + + assert abs(ratio_result["variance_ratio"] - expected_var_ratio) < 1e-6, ( + f"Expected {expected_var_ratio}, got {ratio_result['variance_ratio']}" + ) + assert abs(ratio_result["se_ratio"] - expected_se_ratio) < 1e-6, ( + f"Expected {expected_se_ratio}, got {ratio_result['se_ratio']}" + ) + + def test_with_bp_columns_multi_eu(self): + """ + Full B&P ratio variance with 2 EUs and hand-calculated values. + """ + from pyfia.estimation.variance import calculate_ratio_of_means_variance + + plot_data = pl.DataFrame( + { + "PLT_CN": ["P1", "P2", "P3", "P4", "P5"], + "STRATUM_CN": [1, 1, 1, 2, 2], + "EXPNS": [2666.67, 2666.67, 2666.67, 6000.0, 6000.0], + "ESTN_UNIT_CN": [100, 100, 100, 200, 200], + "STRATUM_WGT": [1.0, 1.0, 1.0, 1.0, 1.0], + "AREA_USED": [8000.0, 8000.0, 8000.0, 12000.0, 12000.0], + "P2POINTCNT": [3.0, 3.0, 3.0, 2.0, 2.0], + "y_i": [10.0, 20.0, 30.0, 50.0, 70.0], + "x_i": [0.8, 1.0, 0.9, 0.7, 0.95], + } + ) + + result = calculate_ratio_of_means_variance(plot_data, "y_i", "x_i") + + # Basic assertions + assert result["variance_total"] > 0 + assert result["variance_ratio"] >= 0 + assert result["se_ratio"] >= 0 + assert result["total_x"] > 0 + assert result["ratio"] > 0 + + def test_positive_covariance_reduces_variance(self): + """ + When Y and X are positively correlated, the ratio variance + should be less than V(Y)/X^2 (the old simplified formula). + + This is because V(R) = (1/X^2) * [V(Y) + R^2*V(X) - 2*R*Cov] + and the -2*R*Cov term reduces variance when Cov > 0. + """ + from pyfia.estimation.variance import calculate_ratio_of_means_variance + + # Create data where y and x are positively correlated + # High x -> high y (more forest area -> more volume) + plot_data = pl.DataFrame( + { + "PLT_CN": [f"P{i}" for i in range(6)], + "STRATUM_CN": [1] * 6, + "EXPNS": [2000.0] * 6, + "ESTN_UNIT_CN": [100] * 6, + "STRATUM_WGT": [1.0] * 6, + "AREA_USED": [12000.0] * 6, + "P2POINTCNT": [6.0] * 6, + "y_i": [5.0, 15.0, 25.0, 35.0, 45.0, 55.0], + "x_i": [0.3, 0.5, 0.7, 0.8, 0.9, 1.0], + } + ) + + result = calculate_ratio_of_means_variance(plot_data, "y_i", "x_i") + + # Calculate the old simplified formula: V(Y) / X^2 + old_se_acre = result["se_total"] / result["total_x"] + old_var_acre = old_se_acre**2 + + # Ratio variance should be smaller due to positive covariance + assert result["variance_ratio"] < old_var_acre, ( + f"Ratio variance {result['variance_ratio']} should be < " + f"simplified {old_var_acre} for positively correlated data" + ) + + def test_grouped_ratio_variance(self): + """ + Test grouped ratio variance with two species groups. + """ + from pyfia.estimation.variance import ( + calculate_grouped_domain_total_variance, + ) + + plot_data = pl.DataFrame( + { + "PLT_CN": ["P1", "P2", "P3", "P4", "P5", "P6"], + "STRATUM_CN": [1, 1, 1, 1, 1, 1], + "EXPNS": [2000.0] * 6, + "ESTN_UNIT_CN": [100] * 6, + "STRATUM_WGT": [1.0] * 6, + "AREA_USED": [12000.0] * 6, + "P2POINTCNT": [6.0] * 6, + "y_i": [10.0, 20.0, 30.0, 5.0, 15.0, 25.0], + "x_i": [0.8, 0.9, 1.0, 0.6, 0.7, 0.85], + "SPCD": [100, 100, 100, 200, 200, 200], + } + ) + + result = calculate_grouped_domain_total_variance( + plot_data, + group_cols=["SPCD"], + y_col="y_i", + x_col="x_i", + ) + + assert len(result) == 2 + assert "variance_acre" in result.columns + assert "se_acre" in result.columns + assert "variance_total" in result.columns + assert "se_total" in result.columns + + # Each group should have positive variance + for row in result.iter_rows(named=True): + assert row["variance_total"] > 0 + assert row["se_total"] > 0 + assert row["se_acre"] > 0 + assert row["variance_acre"] > 0 + + def test_fallback_without_bp_columns(self): + """ + Ratio variance should work with simplified formula when B&P columns absent. + """ + from pyfia.estimation.variance import calculate_ratio_of_means_variance + + plot_data = pl.DataFrame( + { + "PLT_CN": ["P1", "P2", "P3", "P4"], + "STRATUM_CN": [1, 1, 1, 1], + "EXPNS": [1000.0, 1000.0, 1000.0, 1000.0], + "y_i": [10.0, 20.0, 30.0, 40.0], + "x_i": [0.8, 0.9, 1.0, 0.7], + } + ) + + result = calculate_ratio_of_means_variance(plot_data, "y_i", "x_i") + + # Should still compute valid results + assert result["variance_total"] > 0 + assert result["se_total"] > 0 + assert result["variance_ratio"] >= 0 + assert result["se_ratio"] >= 0 + assert result["total_y"] > 0 + assert result["total_x"] > 0 + + @pytest.mark.parametrize("seed", range(10)) + def test_ratio_variance_non_negative(self, seed): + """Property test: ratio variance should never be negative.""" + from pyfia.estimation.variance import calculate_ratio_of_means_variance + + np.random.seed(seed) + n_plots = np.random.randint(3, 15) + + plot_data = pl.DataFrame( + { + "PLT_CN": [f"P{i}" for i in range(n_plots)], + "STRATUM_CN": [1] * n_plots, + "EXPNS": [np.random.uniform(500, 3000)] * n_plots, + "ESTN_UNIT_CN": [100] * n_plots, + "STRATUM_WGT": [1.0] * n_plots, + "AREA_USED": [np.random.uniform(5000, 50000)] * n_plots, + "P2POINTCNT": [float(n_plots)] * n_plots, + "y_i": np.random.uniform(0, 100, n_plots).tolist(), + "x_i": np.random.uniform(0.1, 1.0, n_plots).tolist(), + } + ) + + result = calculate_ratio_of_means_variance(plot_data, "y_i", "x_i") + + assert result["variance_ratio"] >= 0, ( + f"Ratio variance was negative: {result['variance_ratio']}" + ) + assert result["se_ratio"] >= 0 + + def test_ungrouped_fallback_uses_ratio_variance(self): + """ + Test that calculate_grouped_domain_total_variance with no group_cols + uses ratio variance instead of simple se_total/total_x. + """ + from pyfia.estimation.variance import ( + calculate_grouped_domain_total_variance, + calculate_ratio_of_means_variance, + ) + + # Data with positive correlation between y and x + plot_data = pl.DataFrame( + { + "PLT_CN": [f"P{i}" for i in range(5)], + "STRATUM_CN": [1] * 5, + "EXPNS": [2000.0] * 5, + "ESTN_UNIT_CN": [100] * 5, + "STRATUM_WGT": [1.0] * 5, + "AREA_USED": [10000.0] * 5, + "P2POINTCNT": [5.0] * 5, + "y_i": [5.0, 15.0, 25.0, 35.0, 45.0], + "x_i": [0.3, 0.5, 0.7, 0.8, 1.0], + } + ) + + # No valid group cols -> falls back to scalar path + grouped_result = calculate_grouped_domain_total_variance( + plot_data, + group_cols=["NONEXISTENT_COL"], + y_col="y_i", + x_col="x_i", + ) + + # Should match scalar ratio variance + scalar_result = calculate_ratio_of_means_variance(plot_data, "y_i", "x_i") + + assert abs(grouped_result["se_acre"][0] - scalar_result["se_ratio"]) < 1e-10 + assert abs(grouped_result["se_total"][0] - scalar_result["se_total"]) < 1e-10