diff --git a/.gitignore b/.gitignore index bde32a7..1f259fd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .env* +.local.env service-account-key-base64.txt compostable-428115-5dde0b40960b.json data/* diff --git a/scripts/pipeline-template.py b/scripts/pipeline-template.py index 5edb2c1..492e3bc 100644 --- a/scripts/pipeline-template.py +++ b/scripts/pipeline-template.py @@ -29,7 +29,9 @@ "% Residuals (Area)", ] -ITEMS_PATH = DATA_DIR / "CFTP Test Item Inventory with Dimensions - All Trials.xlsx" +ITEMS_PATH = ( + DATA_DIR / "CFTP Test Item Inventory with Dimensions - All Trials.xlsx" +) EXTRA_ITEMS_PATH = DATA_DIR / "Item IDS for CASP004 CASP003.xlsx" df_items = pd.read_excel(ITEMS_PATH, sheet_name=0, skiprows=3) @@ -42,7 +44,10 @@ OUTLIER_THRESHOLD = 10 item2id = { - key.strip(): value for key, value in df_items.set_index("Item Description Refined")["Item ID"].to_dict().items() + key.strip(): value + for key, value in df_items.set_index("Item Description Refined")["Item ID"] + .to_dict() + .items() } extra_items = pd.read_excel(EXTRA_ITEMS_PATH) @@ -74,7 +79,9 @@ def map_technology(trial_id: str) -> str: return "Unknown" -TRIALS_PATH = DATA_DIR / "CFTP Anonymized Data Compilation Overview - For Sharing.xlsx" +TRIALS_PATH = ( + DATA_DIR / "CFTP Anonymized Data Compilation Overview - For Sharing.xlsx" +) df_trials = pd.read_excel(TRIALS_PATH, skiprows=3) trial2id = { @@ -100,9 +107,13 @@ def map_technology(trial_id: str) -> str: "Facility 10": "WR005-01", } -OPERATING_CONDITIONS_PATH = DATA_DIR / "Donated Data 2023 - Compiled Facility Conditions for DSI.xlsx" +OPERATING_CONDITIONS_PATH = ( + DATA_DIR / "Donated Data 2023 - Compiled Facility Conditions for DSI.xlsx" +) -df_temps = pd.read_excel(OPERATING_CONDITIONS_PATH, sheet_name=3, skiprows=1, index_col="Day #") +df_temps = pd.read_excel( + OPERATING_CONDITIONS_PATH, sheet_name=3, skiprows=1, index_col="Day #" +) df_temps.columns = [trial2id[col.replace("*", "")] for col in df_temps.columns] df_temps_avg = df_temps.mean().to_frame("Average Temperature (F)") df_temps["Operating Condition"] = "Temperature" @@ -113,29 +124,44 @@ def map_technology(trial_id: str) -> str: sheet_name=2, skiprows=3, ) -df_trial_duration.columns = [col.replace("\n", "").strip() for col in df_trial_duration.columns] -df_trial_duration = df_trial_duration[["Facility Designation", "Endpoint Analysis (trial length)"]].rename( +df_trial_duration.columns = [ + col.replace("\n", "").strip() for col in df_trial_duration.columns +] +df_trial_duration = df_trial_duration[ + ["Facility Designation", "Endpoint Analysis (trial length)"] +].rename( columns={ "Facility Designation": "Trial ID", "Endpoint Analysis (trial length)": "Trial Duration", } ) df_trial_duration["Trial ID"] = ( - df_trial_duration["Trial ID"].str.replace("( ", "(", regex=False).str.replace(" )", ")", regex=False).map(trial2id) + df_trial_duration["Trial ID"] + .str.replace("( ", "(", regex=False) + .str.replace(" )", ")", regex=False) + .map(trial2id) ) df_trial_duration = df_trial_duration.set_index("Trial ID") -df_moisture = pd.read_excel(OPERATING_CONDITIONS_PATH, sheet_name=4, skiprows=1, index_col="Week") +df_moisture = pd.read_excel( + OPERATING_CONDITIONS_PATH, sheet_name=4, skiprows=1, index_col="Week" +) # Filter out rows with non-numeric week values df_moisture = df_moisture.reset_index() -df_moisture = df_moisture[pd.to_numeric(df_moisture["Week"], errors="coerce").notna()] +df_moisture = df_moisture[ + pd.to_numeric(df_moisture["Week"], errors="coerce").notna() +] df_moisture = df_moisture.set_index("Week") -df_moisture.columns = [trial2id[col.replace("*", "")] for col in df_moisture.columns] +df_moisture.columns = [ + trial2id[col.replace("*", "")] for col in df_moisture.columns +] df_moisture_avg = df_moisture.mean().to_frame("Average % Moisture (In Field)") df_moisture["Operating Condition"] = "Moisture" df_moisture["Time Unit"] = "Week" -df_o2 = pd.read_excel(OPERATING_CONDITIONS_PATH, sheet_name=6, skiprows=1, index_col="Week") +df_o2 = pd.read_excel( + OPERATING_CONDITIONS_PATH, sheet_name=6, skiprows=1, index_col="Week" +) df_o2 = df_o2.reset_index() df_o2 = df_o2[pd.to_numeric(df_o2["Week"], errors="coerce").notna()] df_o2 = df_o2.set_index("Week") @@ -143,7 +169,9 @@ def map_technology(trial_id: str) -> str: df_o2["Operating Condition"] = "Oxygen" df_o2["Time Unit"] = "Week" -df_operating_conditions_avg = pd.concat([df_trial_duration, df_temps_avg, df_moisture_avg], axis=1) +df_operating_conditions_avg = pd.concat( + [df_trial_duration, df_temps_avg, df_moisture_avg], axis=1 +) processed_data = [] @@ -189,16 +217,24 @@ def __init__( filename = self.data_filepath.stem self.trial_name = trial_name self.trials = trials - file_suffix = f"_{trial_name}_clean.csv" if self.trial_name else "_clean.csv" - self.output_filepath = self.data_filepath.with_name(filename + file_suffix) + file_suffix = ( + f"_{trial_name}_clean.csv" if self.trial_name else "_clean.csv" + ) + self.output_filepath = self.data_filepath.with_name( + filename + file_suffix + ) # TODO: This is kind of messy and could probably be better - self.raw_data = self.load_data(data_filepath, sheet_name=sheet_name, skiprows=skiprows) + self.raw_data = self.load_data( + data_filepath, sheet_name=sheet_name, skiprows=skiprows + ) self.items = items self.item2id = item2id @abstractmethod - def load_data(self, data_filepath: Path, sheet_name: int = 0, skip_rows: int = 0) -> pd.DataFrame: + def load_data( + self, data_filepath: Path, sheet_name: int = 0, skip_rows: int = 0 + ) -> pd.DataFrame: """Loads data from the specified file. This method should be implemented by subclasses to load data from the @@ -262,7 +298,9 @@ def merge_with_trials(self, data: pd.DataFrame) -> pd.DataFrame: Returns: Data merged with trial information. """ - return data.merge(self.trials, left_on="Trial ID", right_on="Public Trial ID") + return data.merge( + self.trials, left_on="Trial ID", right_on="Public Trial ID" + ) def run(self, save: bool = False) -> pd.DataFrame: """Runs the data pipeline. @@ -294,7 +332,9 @@ def run(self, save: bool = False) -> pd.DataFrame: class NewTemplatePipeline(AbstractDataPipeline): """Pipeline for processing data from the new template.""" - def load_data(self, data_filepath: Path, sheet_name: int = 0, skiprows: int = 0) -> pd.DataFrame: + def load_data( + self, data_filepath: Path, sheet_name: int = 0, skiprows: int = 0 + ) -> pd.DataFrame: """Loads data from the specified CSV file. Args: @@ -308,7 +348,8 @@ def load_data(self, data_filepath: Path, sheet_name: int = 0, skiprows: int = 0) # Read the CSV file into a DataFrame data = pd.read_csv(data_filepath) - # Find the index of the first completely empty row — formatted so there's comments below the data + # Find the index of the first completely empty row — formatted + # so there's comments below the data first_empty_row_index = data[data.isna().all(axis=1)].index.min() # If an empty row is found, drop all rows below it @@ -331,13 +372,22 @@ def preprocess_data(self, data): "Trial": "Trial ID", } ) - percentage_cols = ["% Residuals (Dry Weight)", "% Residuals (Wet Weight)", "% Residuals (Area)"] + percentage_cols = [ + "% Residuals (Dry Weight)", + "% Residuals (Wet Weight)", + "% Residuals (Area)", + ] data[percentage_cols] = data[percentage_cols].replace("no data", np.nan) - # TODO: Depending on how the data actually comes in, maybe we don't want to do it this way? - data[percentage_cols] = data[percentage_cols].replace("%", "", regex=True).astype(float) / 100 + # TODO: Depending data actually comes in, maybe we don't want to do it this way? + data[percentage_cols] = ( + data[percentage_cols].replace("%", "", regex=True).astype(float) + / 100 + ) # Prefer dry weight to wet weight if available - data["% Residuals (Mass)"] = data["% Residuals (Dry Weight)"].fillna(data["% Residuals (Wet Weight)"]) + data["% Residuals (Mass)"] = data["% Residuals (Dry Weight)"].fillna( + data["% Residuals (Wet Weight)"] + ) return data @@ -350,7 +400,9 @@ def join_with_items(self, data): Returns: The joined data """ - return self.items.drop_duplicates(subset="Item Name").merge(data, on="Item Name") + return self.items.drop_duplicates(subset="Item Name").merge( + data, on="Item Name" + ) def merge_with_trials(self, data): """Join with the trials table @@ -366,12 +418,19 @@ def merge_with_trials(self, data): "Test Method": "Mesh Bag", "Technology": "Windrow", } - self.trials = pd.concat([self.trials, pd.DataFrame(dummy_trial, index=[0])], ignore_index=True) + self.trials = pd.concat( + [self.trials, pd.DataFrame(dummy_trial, index=[0])], + ignore_index=True, + ) return data.merge(self.trials, on="Trial ID") -NEW_TEMPLATE_PATH = DATA_DIR / "CFTP_DisintegrationDataInput_Template_sept92024.csv" -new_template_pipeline = NewTemplatePipeline(NEW_TEMPLATE_PATH, trial_name="Dummy Data for New Template") +NEW_TEMPLATE_PATH = ( + DATA_DIR / "CFTP_DisintegrationDataInput_Template_sept92024.csv" +) +new_template_pipeline = NewTemplatePipeline( + NEW_TEMPLATE_PATH, trial_name="Dummy Data for New Template" +) # TODO: This is commented out so we don't add the dummy data to the "real" data # processed_data.append(new_template_pipeline.run()) @@ -387,22 +446,28 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: **kwargs: Arbitrary keyword arguments. """ super().__init__(*args, **kwargs) - # We are using the start weight specific to this trial so drop the Start Weight column + # We are using the start weight specific to this trial + # so drop the Start Weight column # Start weight is set in preprocess_data self.items = self.items.drop("Start Weight", axis=1) - def load_data(self, data_filepath: Path, sheet_name: int = 0, skiprows: int = 0) -> pd.DataFrame: + def load_data( + self, data_filepath: Path, sheet_name: int = 0, skiprows: int = 0 + ) -> pd.DataFrame: """Loads data from the specified Excel file. Args: data_filepath (Path): Path to the data file. sheet_name (int, optional): Sheet name or index to load. Defaults to 0. - skiprows (int, optional): Number of rows to skip at the start of the file. Defaults to 0. + skiprows (int, optional): Number of rows to skip at the start + of the file. Defaults to 0. Returns: Loaded data. """ - return pd.read_excel(data_filepath, sheet_name=sheet_name, skiprows=skiprows) + return pd.read_excel( + data_filepath, sheet_name=sheet_name, skiprows=skiprows + ) def preprocess_data(self, data: pd.DataFrame) -> pd.DataFrame: """Preprocesses the data. @@ -429,22 +494,34 @@ def preprocess_data(self, data: pd.DataFrame) -> pd.DataFrame: data["Trial"] = data["Trial Id"] # Take the average of the three weight observations - data["End Weight"] = data[["Weight 1", "Weight 2", "Weight 3"]].mean(axis=1) + data["End Weight"] = data[["Weight 1", "Weight 2", "Weight 3"]].mean( + axis=1 + ) # Null values mean the item fully disintegrated data["End Weight"] = data["End Weight"].fillna(0) # Ok...we need to do some weird items work arounds here...this might work? - casp004_items = pd.read_excel(self.data_filepath, sheet_name=2).drop_duplicates(subset=["Item Name"]) - casp004_weights = casp004_items.set_index("Item Name")["Weight (average)"].to_dict() + casp004_items = pd.read_excel( + self.data_filepath, sheet_name=2 + ).drop_duplicates(subset=["Item Name"]) + casp004_weights = casp004_items.set_index("Item Name")[ + "Weight (average)" + ].to_dict() data["Start Weight"] = data["Product Name"].map(casp004_weights) # rename so this matches the other trials data["Item Description Refined"] = data["Product Name"] # TODO: Some of this should be in the abstract method... - data["Item ID"] = data["Item Description Refined"].str.strip().map(self.item2id) + data["Item ID"] = ( + data["Item Description Refined"].str.strip().map(self.item2id) + ) # Prevent duplicate columns when merging with items - data = data.rename(columns={"Item Description Refined": "Item Description Refined (Trial)"}) + data = data.rename( + columns={ + "Item Description Refined": "Item Description Refined (Trial)" + } + ) data["Trial ID"] = "CASP004-01" if data["Item ID"].isna().sum() > 0: raise ValueError("There are null items after mapping") @@ -463,7 +540,9 @@ def calculate_results(self, data: pd.DataFrame) -> pd.DataFrame: Returns: Data with calculated results. """ - data["End Weight"] = data[["Weight 1", "Weight 2", "Weight 3"]].mean(axis=1) + data["End Weight"] = data[["Weight 1", "Weight 2", "Weight 3"]].mean( + axis=1 + ) data["End Weight"] = data["End Weight"].fillna(0) data["% Residuals (Area)"] = None @@ -471,8 +550,13 @@ def calculate_results(self, data: pd.DataFrame) -> pd.DataFrame: return data -CASP004_PATH = DATA_DIR / "CASP004-01 - Results Pre-Processed for Analysis from PDF Tables.xlsx" -casp004_pipeline = CASP004Pipeline(CASP004_PATH, sheet_name=1, trial_name="casp004") +CASP004_PATH = ( + DATA_DIR + / "CASP004-01 - Results Pre-Processed for Analysis from PDF Tables.xlsx" +) +casp004_pipeline = CASP004Pipeline( + CASP004_PATH, sheet_name=1, trial_name="casp004" +) processed_data.append(casp004_pipeline.run()) @@ -521,7 +605,9 @@ def melt_trial(self, data: pd.DataFrame, value_name: str) -> pd.DataFrame: .reset_index(drop=True) ) - def load_data(self, data_filepath: Path, sheet_name: int = 0, skiprows: int = 0) -> pd.DataFrame: + def load_data( + self, data_filepath: Path, sheet_name: int = 0, skiprows: int = 0 + ) -> pd.DataFrame: """Loads data from the specified Excel file. Args: @@ -562,26 +648,38 @@ def preprocess_data(self, data: pd.DataFrame) -> pd.DataFrame: return data -TEN_TRIALS_PATH = DATA_DIR / "Donated Data 2023 - Compiled Field Results for DSI.xlsx" -closed_loop_pipeline = ClosedLoopPipeline(TEN_TRIALS_PATH, trial_name="closed_loop") +TEN_TRIALS_PATH = ( + DATA_DIR / "Donated Data 2023 - Compiled Field Results for DSI.xlsx" +) +closed_loop_pipeline = ClosedLoopPipeline( + TEN_TRIALS_PATH, trial_name="closed_loop" +) processed_data.append(closed_loop_pipeline.run()) class PDFPipeline(AbstractDataPipeline): """Pipeline for processing PDF trial data.""" - def __init__(self, *args: Any, weight_col: str = "Residual Weight - Oven-dry", **kwargs: Any) -> None: + def __init__( + self, + *args: Any, + weight_col: str = "Residual Weight - Oven-dry", + **kwargs: Any, + ) -> None: """Initializes the PDFPipeline with the given parameters. Args: *args: Arbitrary non-keyword arguments. - weight_col: Column name for the residual weight. Defaults to "Residual Weight - Oven-dry". + weight_col: Column name for the residual weight. + Defaults to "Residual Weight - Oven-dry". **kwargs: Arbitrary keyword arguments. """ super().__init__(*args, **kwargs) self.weight_col = weight_col - def load_data(self, data_filepath: Path, sheet_name: int = 0, skiprows: int = 0) -> pd.DataFrame: + def load_data( + self, data_filepath: Path, sheet_name: int = 0, skiprows: int = 0 + ) -> pd.DataFrame: """Loads data from the specified Excel file. Args: @@ -592,7 +690,9 @@ def load_data(self, data_filepath: Path, sheet_name: int = 0, skiprows: int = 0) Returns: Loaded data. """ - return pd.read_excel(data_filepath, sheet_name=sheet_name, skiprows=skiprows) + return pd.read_excel( + data_filepath, sheet_name=sheet_name, skiprows=skiprows + ) def join_with_items(self, data: pd.DataFrame) -> pd.DataFrame: """Joins the data with item information. @@ -606,10 +706,16 @@ def join_with_items(self, data: pd.DataFrame) -> pd.DataFrame: Returns: Data joined with item information. """ - # TODO: Do we want to merge on ID or should we just merge on description if we have it? - data["Item ID"] = data["Item Description Refined"].str.strip().map(self.item2id) + # TODO: Merge on ID or should we just merge on description if we have it? + data["Item ID"] = ( + data["Item Description Refined"].str.strip().map(self.item2id) + ) # Prevent duplicate columns when merging with items - data = data.rename(columns={"Item Description Refined": "Item Description Refined (Trial)"}) + data = data.rename( + columns={ + "Item Description Refined": "Item Description Refined (Trial)" + } + ) drop_cols = ["Item Description From Trial"] data = data.drop(drop_cols, axis=1) if data["Item ID"].isna().sum() > 0: @@ -628,7 +734,9 @@ def calculate_results(self, data: pd.DataFrame) -> pd.DataFrame: Returns: Data with calculated results. """ - data["% Residuals (Mass)"] = data[self.weight_col] / (data["Start Weight"] * data["Number of Items per bag"]) + data["% Residuals (Mass)"] = data[self.weight_col] / ( + data["Start Weight"] * data["Number of Items per bag"] + ) data["% Residuals (Area)"] = None data["Trial"] = data["Trial ID"] return data @@ -636,7 +744,9 @@ def calculate_results(self, data: pd.DataFrame) -> pd.DataFrame: PDF_TRIALS = DATA_DIR / "Compiled Field Results - CFTP Gathered Data.xlsx" -ad001_pipeline = PDFPipeline(PDF_TRIALS, trial_name="ad001", sheet_name=0, skiprows=1) +ad001_pipeline = PDFPipeline( + PDF_TRIALS, trial_name="ad001", sheet_name=0, skiprows=1 +) processed_data.append(ad001_pipeline.run()) wr001_pipeline = PDFPipeline(PDF_TRIALS, trial_name="wr001", sheet_name=1) @@ -686,7 +796,9 @@ def preprocess_data(self, data: pd.DataFrame) -> pd.DataFrame: # Exclude mixed materials and multi-laminate pouches all_trials = all_trials[~(all_trials["Material Class II"] == "Mixed Materials")] -all_trials = all_trials[~(all_trials["Item Name"] == "Multi-laminate stand-up pounch with zipper")] +all_trials = all_trials[ + ~(all_trials["Item Name"] == "Multi-laminate stand-up pounch with zipper") +] # Exclude anything over 1000% as outlier all_trials = all_trials[all_trials["% Residuals (Mass)"] < OUTLIER_THRESHOLD] @@ -699,7 +811,8 @@ def preprocess_data(self, data: pd.DataFrame) -> pd.DataFrame: def anonymize_brand(brand: str) -> str: - """Anonymizes brand names by mapping them to a generic brand. Sorry for the global variable. + """Anonymizes brand names by mapping them to a generic brand. + Sorry for the global variable. Args: brand: The brand name @@ -720,17 +833,23 @@ def anonymize_brand(brand: str) -> str: # Make sure all trial IDs are represented in operating conditions -unique_trial_ids = pd.DataFrame(all_trials["Trial ID"].unique(), columns=["Trial ID"]).set_index("Trial ID") +unique_trial_ids = pd.DataFrame( + all_trials["Trial ID"].unique(), columns=["Trial ID"] +).set_index("Trial ID") df_operating_conditions_avg = unique_trial_ids.merge( df_operating_conditions_avg, left_index=True, right_index=True, how="left" ) operating_conditions_avg_output_path = DATA_DIR / "operating_conditions_avg.csv" -df_operating_conditions_avg.to_csv(operating_conditions_avg_output_path, index_label="Trial ID") +df_operating_conditions_avg.to_csv( + operating_conditions_avg_output_path, index_label="Trial ID" +) # Save full operating conditions data operating_conditions_output_path = DATA_DIR / "operating_conditions_full.csv" df_operating_conditions = pd.concat([df_temps, df_moisture, df_o2], axis=0) -df_operating_conditions.to_csv(operating_conditions_output_path, index=True, index_label="Time Step") +df_operating_conditions.to_csv( + operating_conditions_output_path, index=True, index_label="Time Step" +) print("Complete!")