Skip to content

Commit

Permalink
convert the hh seed process to polars
Browse files Browse the repository at this point in the history
  • Loading branch information
bobkatla committed Jul 14, 2024
1 parent c6bc2c2 commit 09fe8b9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 44 deletions.
13 changes: 11 additions & 2 deletions PopSynthesis/DataProcessor/DataProcessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
processed_data_dir,
output_dir,
)
from PopSynthesis.DataProcessor.utils.const_process import HH_ATTS, LS_GR_RELA, LS_HH_INC
from PopSynthesis.DataProcessor.utils.general_utils import find_file
from PopSynthesis.DataProcessor.utils.seed.hh.process_general_hh import convert_hh_totvehs, convert_hh_size, convert_hh_dwell, convert_hh_inc
import polars as pl


class DataProcessorGeneric:
Expand All @@ -31,7 +34,12 @@ def process_all_seed(self):
def process_households_seed(self):
# Import the hh seed data
hh_file = find_file(base_path=self.raw_data_path, filename=hh_seed_file)
print(hh_file)
raw_hh_seed = pl.read_csv(hh_file)
hh_df = convert_hh_totvehs(raw_hh_seed)
hh_df = convert_hh_inc(hh_df, check_states=LS_HH_INC)
hh_df = convert_hh_dwell(hh_df)
hh_df = convert_hh_size(hh_df)
return hh_df

def process_persons_seed(self):
NotImplemented
Expand All @@ -47,4 +55,5 @@ def process_persons_census(self):


if __name__ == "__main__":
a = DataProcessorGeneric()
a = DataProcessorGeneric(raw_data_dir, processed_data_dir, output_dir)
a.process_households_seed()
84 changes: 42 additions & 42 deletions PopSynthesis/DataProcessor/utils/seed/hh/process_general_hh.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,56 @@
def convert_hh_totvehs(hh_df, veh_limit=4):
def convert_veh(row):
if row["totalvehs"] < veh_limit:
return str(row["totalvehs"])
else:
return f"{veh_limit}+"
import polars as pl


hh_df["totalvehs"] = hh_df.apply(convert_veh, axis=1)
def convert_hh_totvehs(hh_df: pl.DataFrame, veh_limit=4):
# Define the conditional operation
def convert_veh(col, veh_limit):
return pl.when(col < veh_limit).then(col.cast(pl.Utf8)).otherwise(pl.lit(f"{veh_limit}+"))

hh_df = hh_df.with_columns(
convert_veh(pl.col("totalvehs"), veh_limit).alias("totalvehs")
)
return hh_df


def convert_hh_inc(hh_df, check_states):
def con_inc(row):
hh_inc = row["hhinc"]
# Confime hhinc always exist, it's float
if hh_inc < 0:
return "Negative income" # NOTE: None like this but exist in census, need to check whether this can be an issue
elif hh_inc > 0:
for state in check_states:
bool_val = None
if "$" in state:
state = state.replace(",", "").replace("$", "")
if "more" in state:
val = state.split(" ")[0]
bool_val = hh_inc >= int(val)
state = val + "+"
elif "-" in state:
state = state.split(" ")[0]
a, b = state.split("-")
bool_val = hh_inc >= int(a) and hh_inc <= int(b)
else:
raise ValueError(f"Dunno I never seen this lol {state}")
if bool_val:
return state
# Note there can be null
hhinc_col = pl.col("hhinc")

# Base expression
expr = pl.when(hhinc_col < 0).then(pl.lit("Negative income"))
expr = expr.when(hhinc_col == 0).then(pl.lit("Nil income"))

# Generate conditions and results for each state in check_states
for state in check_states:
state_clean = state.replace(",", "").replace("$", "").split(" ")[0]
if "more" in state:
val = int(state_clean)
expr = expr.when(hhinc_col >= val).then(pl.lit(f"{val}+"))
elif "-" in state:
a, b = map(int, state_clean.split("-"))
expr = expr.when((hhinc_col >= a) & (hhinc_col <= b)).then(pl.lit(f"{a}-{b}"))
else:
return "Nil income"
raise ValueError(f"Dunno I never seen this lol {state}")

# Final otherwise to retain the original value if no conditions match
expr = expr.otherwise(hhinc_col)

# Apply the transformation
hh_df = hh_df.with_columns(expr.alias("hhinc"))

hh_df["hhinc"] = hh_df.apply(con_inc, axis=1)
return hh_df



def convert_hh_dwell(hh_df): # Removing the occupied rent free
hh_df["owndwell"] = hh_df.apply(
lambda r: "Something Else"
if r["owndwell"] == "Occupied Rent-Free"
else r["owndwell"],
axis=1,
)
def convert_hh_dwell(hh_df: pl.DataFrame): # Removing the occupied rent free
col_owndwell = pl.col("owndwell")
expr = pl.when(col_owndwell=="Occupied Rent-Free").then(pl.lit("Something Else")).otherwise(col_owndwell)
hh_df = hh_df.with_columns(expr.alias("owndwell"))
return hh_df


def convert_hh_size(hh_df):
hh_df["hhsize"] = hh_df.apply(
lambda r: "8+" if r["hhsize"] >= 8 else str(r["hhsize"]), axis=1
)
col_hhsz = pl.col("hhsize")
max_hhsz = 8 # const, based on census
expr = pl.when(col_hhsz >= max_hhsz).then(pl.lit(f"{max_hhsz}+")).otherwise(col_hhsz.cast(pl.Utf8))
hh_df = hh_df.with_columns(expr.alias("hhsize"))
return hh_df

0 comments on commit 09fe8b9

Please sign in to comment.