From 09fe8b9c29fb5a1fa3d06f51d0b537b562866835 Mon Sep 17 00:00:00 2001 From: Duc Minh La Date: Sun, 14 Jul 2024 19:13:57 +1000 Subject: [PATCH] convert the hh seed process to polars --- PopSynthesis/DataProcessor/DataProcessor.py | 13 ++- .../utils/seed/hh/process_general_hh.py | 84 +++++++++---------- 2 files changed, 53 insertions(+), 44 deletions(-) diff --git a/PopSynthesis/DataProcessor/DataProcessor.py b/PopSynthesis/DataProcessor/DataProcessor.py index 7e64c34..0baa1d9 100644 --- a/PopSynthesis/DataProcessor/DataProcessor.py +++ b/PopSynthesis/DataProcessor/DataProcessor.py @@ -11,7 +11,10 @@ processed_data_dir, output_dir, ) +from PopSynthesis.DataProcessor.utils.const_process import HH_ATTS, LS_GR_RELA, LS_HH_INC from PopSynthesis.DataProcessor.utils.general_utils import find_file +from PopSynthesis.DataProcessor.utils.seed.hh.process_general_hh import convert_hh_totvehs, convert_hh_size, convert_hh_dwell, convert_hh_inc +import polars as pl class DataProcessorGeneric: @@ -31,7 +34,12 @@ def process_all_seed(self): def process_households_seed(self): # Import the hh seed data hh_file = find_file(base_path=self.raw_data_path, filename=hh_seed_file) - print(hh_file) + raw_hh_seed = pl.read_csv(hh_file) + hh_df = convert_hh_totvehs(raw_hh_seed) + hh_df = convert_hh_inc(hh_df, check_states=LS_HH_INC) + hh_df = convert_hh_dwell(hh_df) + hh_df = convert_hh_size(hh_df) + return hh_df def process_persons_seed(self): NotImplemented @@ -47,4 +55,5 @@ def process_persons_census(self): if __name__ == "__main__": - a = DataProcessorGeneric() + a = DataProcessorGeneric(raw_data_dir, processed_data_dir, output_dir) + a.process_households_seed() diff --git a/PopSynthesis/DataProcessor/utils/seed/hh/process_general_hh.py b/PopSynthesis/DataProcessor/utils/seed/hh/process_general_hh.py index 32ce50f..a655e4e 100644 --- a/PopSynthesis/DataProcessor/utils/seed/hh/process_general_hh.py +++ b/PopSynthesis/DataProcessor/utils/seed/hh/process_general_hh.py @@ -1,56 +1,56 @@ -def convert_hh_totvehs(hh_df, veh_limit=4): - def convert_veh(row): - if row["totalvehs"] < veh_limit: - return str(row["totalvehs"]) - else: - return f"{veh_limit}+" +import polars as pl + - hh_df["totalvehs"] = hh_df.apply(convert_veh, axis=1) +def convert_hh_totvehs(hh_df: pl.DataFrame, veh_limit=4): + # Define the conditional operation + def convert_veh(col, veh_limit): + return pl.when(col < veh_limit).then(col.cast(pl.Utf8)).otherwise(pl.lit(f"{veh_limit}+")) + + hh_df = hh_df.with_columns( + convert_veh(pl.col("totalvehs"), veh_limit).alias("totalvehs") + ) return hh_df def convert_hh_inc(hh_df, check_states): - def con_inc(row): - hh_inc = row["hhinc"] - # Confime hhinc always exist, it's float - if hh_inc < 0: - return "Negative income" # NOTE: None like this but exist in census, need to check whether this can be an issue - elif hh_inc > 0: - for state in check_states: - bool_val = None - if "$" in state: - state = state.replace(",", "").replace("$", "") - if "more" in state: - val = state.split(" ")[0] - bool_val = hh_inc >= int(val) - state = val + "+" - elif "-" in state: - state = state.split(" ")[0] - a, b = state.split("-") - bool_val = hh_inc >= int(a) and hh_inc <= int(b) - else: - raise ValueError(f"Dunno I never seen this lol {state}") - if bool_val: - return state + # Note there can be null + hhinc_col = pl.col("hhinc") + + # Base expression + expr = pl.when(hhinc_col < 0).then(pl.lit("Negative income")) + expr = expr.when(hhinc_col == 0).then(pl.lit("Nil income")) + + # Generate conditions and results for each state in check_states + for state in check_states: + state_clean = state.replace(",", "").replace("$", "").split(" ")[0] + if "more" in state: + val = int(state_clean) + expr = expr.when(hhinc_col >= val).then(pl.lit(f"{val}+")) + elif "-" in state: + a, b = map(int, state_clean.split("-")) + expr = expr.when((hhinc_col >= a) & (hhinc_col <= b)).then(pl.lit(f"{a}-{b}")) else: - return "Nil income" + raise ValueError(f"Dunno I never seen this lol {state}") + + # Final otherwise to retain the original value if no conditions match + expr = expr.otherwise(hhinc_col) + + # Apply the transformation + hh_df = hh_df.with_columns(expr.alias("hhinc")) - hh_df["hhinc"] = hh_df.apply(con_inc, axis=1) return hh_df + - -def convert_hh_dwell(hh_df): # Removing the occupied rent free - hh_df["owndwell"] = hh_df.apply( - lambda r: "Something Else" - if r["owndwell"] == "Occupied Rent-Free" - else r["owndwell"], - axis=1, - ) +def convert_hh_dwell(hh_df: pl.DataFrame): # Removing the occupied rent free + col_owndwell = pl.col("owndwell") + expr = pl.when(col_owndwell=="Occupied Rent-Free").then(pl.lit("Something Else")).otherwise(col_owndwell) + hh_df = hh_df.with_columns(expr.alias("owndwell")) return hh_df def convert_hh_size(hh_df): - hh_df["hhsize"] = hh_df.apply( - lambda r: "8+" if r["hhsize"] >= 8 else str(r["hhsize"]), axis=1 - ) + col_hhsz = pl.col("hhsize") + max_hhsz = 8 # const, based on census + expr = pl.when(col_hhsz >= max_hhsz).then(pl.lit(f"{max_hhsz}+")).otherwise(col_hhsz.cast(pl.Utf8)) + hh_df = hh_df.with_columns(expr.alias("hhsize")) return hh_df