Skip to content

Commit

Permalink
Updated clean data process to output .parquet file instead of .csv. A…
Browse files Browse the repository at this point in the history
…dding any missing weather cols as None. Setting a defined clean data column order. Replacing all np.nan values with None
  • Loading branch information
oislen committed Oct 21, 2024
1 parent 260adaa commit 951dfa5
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions webscraper/utilities/gen_clean_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import re
import numpy as np
import pandas as pd
from beartype import beartype
import cons
Expand Down Expand Up @@ -47,6 +48,8 @@ def load_data(
for col in cons.col_options:
if col in dataframe.columns:
dataframe[col] = dataframe[col].apply(lambda x: x.strip()).replace('', None).astype(float)
else:
dataframe[col] = np.nan
# subset return columns
cols = dataframe.columns[~dataframe.columns.isin(['ind'])]
dataframe = dataframe[cols]
Expand All @@ -55,6 +58,8 @@ def load_data(
dataframe = pd.merge(left=dataframe, right=stations_data, on='station_id', how='inner').rename(columns={'station_id':'id'})
dataframe["county"] = dataframe["county"].str.title()
dataframe["date"] = pd.to_datetime(dataframe["date"], format='%d-%b-%Y')
dataframe = dataframe[cons.cleaned_data_cols]
dataframe = dataframe.replace({np.nan:None})
return dataframe


Expand Down Expand Up @@ -84,13 +89,24 @@ def gen_clean_data(
s3client = S3Client(sessionToken=cons.session_token_fpath)
for fpath in scraped_data_fpaths:
# extract basename
fname = os.path.basename(fpath)
scraped_fname, _ = os.path.splitext(os.path.basename(fpath))
cleaned_fextension = ".parquet"
# load data
clean_data = load_data(fpath)
# write data to clean data directory
cleaned_data_fpath = os.path.join(cleaned_data_dir, fname)
logging.info(f"Writing cleaned data file {cleaned_data_fpath} to disk")
clean_data.to_csv(cleaned_data_fpath, header=True, index=False)
if store_on_s3:
# store data on s3 back up repository
s3client.store(data=clean_data, bucket=cons.s3_bucket, key=f"{cons.s3_clean_directory}/{fname}")
clean_data_shape = clean_data.shape
# only rewrite file with data
if clean_data_shape[0] > 0:
logging.info(f"Clean Data Shape: {clean_data_shape}")
# write data to clean data directory
cleaned_data_fpath = os.path.join(cleaned_data_dir, f"{scraped_fname}{cleaned_fextension}")
logging.info(f"Writing cleaned data file {cleaned_data_fpath} to disk")
#clean_data.to_csv(cleaned_data_fpath, header=True, index=False)
clean_data.to_parquet(cleaned_data_fpath, index=False, schema=cons.cleaned_data_pa_schema)
if store_on_s3:
# store data on s3 back up repository
s3client.store(
data=clean_data,
bucket=cons.s3_bucket,
key=f"{cons.s3_clean_directory}/{scraped_fname}{cleaned_fextension}",
schema=cons.cleaned_data_pa_schema
)

0 comments on commit 951dfa5

Please sign in to comment.