From 951dfa59e0fd04ccdc1e55ff59d48ec899660250 Mon Sep 17 00:00:00 2001 From: Oisin Date: Mon, 21 Oct 2024 10:24:49 +0100 Subject: [PATCH] Updated clean data process to output .parquet file instead of .csv. Adding any missing weather cols as None. Setting a defined clean data column order. Replacing all np.nan values with None --- webscraper/utilities/gen_clean_data.py | 32 +++++++++++++++++++------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/webscraper/utilities/gen_clean_data.py b/webscraper/utilities/gen_clean_data.py index 0a9296a..c36e1b1 100644 --- a/webscraper/utilities/gen_clean_data.py +++ b/webscraper/utilities/gen_clean_data.py @@ -1,6 +1,7 @@ import logging import os import re +import numpy as np import pandas as pd from beartype import beartype import cons @@ -47,6 +48,8 @@ def load_data( for col in cons.col_options: if col in dataframe.columns: dataframe[col] = dataframe[col].apply(lambda x: x.strip()).replace('', None).astype(float) + else: + dataframe[col] = np.nan # subset return columns cols = dataframe.columns[~dataframe.columns.isin(['ind'])] dataframe = dataframe[cols] @@ -55,6 +58,8 @@ def load_data( dataframe = pd.merge(left=dataframe, right=stations_data, on='station_id', how='inner').rename(columns={'station_id':'id'}) dataframe["county"] = dataframe["county"].str.title() dataframe["date"] = pd.to_datetime(dataframe["date"], format='%d-%b-%Y') + dataframe = dataframe[cons.cleaned_data_cols] + dataframe = dataframe.replace({np.nan:None}) return dataframe @@ -84,13 +89,24 @@ def gen_clean_data( s3client = S3Client(sessionToken=cons.session_token_fpath) for fpath in scraped_data_fpaths: # extract basename - fname = os.path.basename(fpath) + scraped_fname, _ = os.path.splitext(os.path.basename(fpath)) + cleaned_fextension = ".parquet" # load data clean_data = load_data(fpath) - # write data to clean data directory - cleaned_data_fpath = os.path.join(cleaned_data_dir, fname) - logging.info(f"Writing cleaned data file {cleaned_data_fpath} to disk") - clean_data.to_csv(cleaned_data_fpath, header=True, index=False) - if store_on_s3: - # store data on s3 back up repository - s3client.store(data=clean_data, bucket=cons.s3_bucket, key=f"{cons.s3_clean_directory}/{fname}") \ No newline at end of file + clean_data_shape = clean_data.shape + # only rewrite file with data + if clean_data_shape[0] > 0: + logging.info(f"Clean Data Shape: {clean_data_shape}") + # write data to clean data directory + cleaned_data_fpath = os.path.join(cleaned_data_dir, f"{scraped_fname}{cleaned_fextension}") + logging.info(f"Writing cleaned data file {cleaned_data_fpath} to disk") + #clean_data.to_csv(cleaned_data_fpath, header=True, index=False) + clean_data.to_parquet(cleaned_data_fpath, index=False, schema=cons.cleaned_data_pa_schema) + if store_on_s3: + # store data on s3 back up repository + s3client.store( + data=clean_data, + bucket=cons.s3_bucket, + key=f"{cons.s3_clean_directory}/{scraped_fname}{cleaned_fextension}", + schema=cons.cleaned_data_pa_schema + ) \ No newline at end of file