Skip to content

Commit

Permalink
Merge pull request #34 from oislen/dev
Browse files Browse the repository at this point in the history
33 athena table
  • Loading branch information
oislen authored Oct 21, 2024
2 parents 084efd4 + 951dfa5 commit 195f0a3
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 15 deletions.
Binary file modified data/master.feather
Binary file not shown.
22 changes: 22 additions & 0 deletions data/webscraper/ref/cleaned_data_cols.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[
"id",
"county",
"station",
"open_year",
"close_year",
"height(m)",
"easting",
"northing",
"latitude",
"longitude",
"maxtp",
"mintp",
"gmin",
"rain",
"wdsp",
"soil",
"sun",
"evap",
"glorad",
"date"
]
31 changes: 30 additions & 1 deletion webscraper/cons.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import sys
import json
import pyarrow as pa

met_eir_historical_data_url = 'https://www.met.ie/climate/available-data/historical-data'
stations_data_url = 'https://cli.fusio.net/cli/climate_data/webdata/StationDetails.csv'
Expand All @@ -14,6 +15,7 @@
gis_dir = os.path.join(data_dir, "gis")
met_eireann_dir = os.path.join(data_dir, 'Met_Eireann')
bokeh_ref_data_dir = os.path.join(data_dir, "bokeh", "ref")
webscraper_ref_data_dir = os.path.join(data_dir, "webscraper", "ref")
master_data_fpath = os.path.join(data_dir, 'master.feather')
preaggregate_data_fpath = os.path.join(data_dir, "preaggregate_data.pickle")
bokeh_line_data_fpath = os.path.join(data_dir, "bokeh_line_data.pickle")
Expand All @@ -30,6 +32,7 @@
col_options_fpath = os.path.join(bokeh_ref_data_dir, "col_options.json")
stat_options_fpath = os.path.join(bokeh_ref_data_dir, "stat_options.json")
agg_level_strftime_fpath = os.path.join(bokeh_ref_data_dir, "agg_level_strftime.json")
cleaned_data_cols_fpath = os.path.join(webscraper_ref_data_dir, "cleaned_data_cols.json")
session_token_fpath = os.path.join(creds_data, "sessionToken.json")

# load bokeh reference data
Expand All @@ -39,9 +42,35 @@
stat_options = json.load(json_file)
with open(agg_level_strftime_fpath) as json_file:
date_strftime_dict = json.load(json_file)
with open(cleaned_data_cols_fpath) as json_file:
cleaned_data_cols = json.load(json_file)

# aws s3 constants
s3_bucket = "irishclimatedashboard"
s3_scraped_directory = "data/Met_Eireann/scraped_data"
s3_clean_directory = "data/Met_Eireann/cleaned_data"
s3_fname = "dly{station_id}.csv"
s3_fname = "dly{station_id}.csv"

# create pyarrow schema for cleaned data
cleaned_data_pa_schema = pa.schema([
pa.field("id", pa.uint64(), nullable=False),
pa.field("county", pa.string(), nullable=False),
pa.field("station", pa.string(), nullable=False),
pa.field("open_year", pa.uint16(), nullable=False),
pa.field("close_year", pa.uint16(), nullable=True),
pa.field("height(m)", pa.uint32(), nullable=False),
pa.field("easting", pa.uint64(), nullable=False),
pa.field("northing", pa.uint64(), nullable=False),
pa.field("latitude", pa.float64(), nullable=False),
pa.field("longitude", pa.float64(), nullable=False),
pa.field("maxtp", pa.float64(), nullable=True),
pa.field("mintp", pa.float64(), nullable=True),
pa.field("gmin", pa.float64(), nullable=True),
pa.field("rain", pa.float64(), nullable=True),
pa.field("wdsp", pa.float64(), nullable=True),
pa.field("soil", pa.float64(), nullable=True),
pa.field("sun", pa.float64(), nullable=True),
pa.field("evap", pa.float64(), nullable=True),
pa.field("glorad", pa.float64(), nullable=True),
pa.field("date", pa.date64(), nullable=False),
])
20 changes: 15 additions & 5 deletions webscraper/utilities/S3Client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import io
import os
import boto3
import json
import logging
import pandas as pd
import pyarrow as pa
from typing import Union
from beartype import beartype

Expand All @@ -28,7 +30,8 @@ def store(
self,
data:pd.DataFrame,
key:str,
bucket:str="irishclimateapp"
bucket:str,
schema=None
):
"""Stores a raw Met Eireann data file on s3.
Expand All @@ -42,12 +45,19 @@ def store(
Returns
-------
"""
_, fextension = os.path.splitext(os.path.basename(key))
try:
logging.info(f"Storing data to S3://{bucket}/{key}")
csv_buf = io.StringIO()
data.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
self.client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=key)
if fextension==".csv":
buf = io.StringIO()
data.to_csv(buf, header=True, index=False)
buf.seek(0)
elif fextension==".parquet":
buf = io.BytesIO()
data.to_parquet(buf, index=False, schema=schema)
else:
raise ValueError("Invalid file extensions {fextension}")
self.client.put_object(Bucket=bucket, Body=buf.getvalue(), Key=key)
except Exception as e:
logging.info(str(e))

Expand Down
32 changes: 24 additions & 8 deletions webscraper/utilities/gen_clean_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import os
import re
import numpy as np
import pandas as pd
from beartype import beartype
import cons
Expand Down Expand Up @@ -47,6 +48,8 @@ def load_data(
for col in cons.col_options:
if col in dataframe.columns:
dataframe[col] = dataframe[col].apply(lambda x: x.strip()).replace('', None).astype(float)
else:
dataframe[col] = np.nan
# subset return columns
cols = dataframe.columns[~dataframe.columns.isin(['ind'])]
dataframe = dataframe[cols]
Expand All @@ -55,6 +58,8 @@ def load_data(
dataframe = pd.merge(left=dataframe, right=stations_data, on='station_id', how='inner').rename(columns={'station_id':'id'})
dataframe["county"] = dataframe["county"].str.title()
dataframe["date"] = pd.to_datetime(dataframe["date"], format='%d-%b-%Y')
dataframe = dataframe[cons.cleaned_data_cols]
dataframe = dataframe.replace({np.nan:None})
return dataframe


Expand Down Expand Up @@ -84,13 +89,24 @@ def gen_clean_data(
s3client = S3Client(sessionToken=cons.session_token_fpath)
for fpath in scraped_data_fpaths:
# extract basename
fname = os.path.basename(fpath)
scraped_fname, _ = os.path.splitext(os.path.basename(fpath))
cleaned_fextension = ".parquet"
# load data
clean_data = load_data(fpath)
# write data to clean data directory
cleaned_data_fpath = os.path.join(cleaned_data_dir, fname)
logging.info(f"Writing cleaned data file {cleaned_data_fpath} to disk")
clean_data.to_csv(cleaned_data_fpath, header=True, index=False)
if store_on_s3:
# store data on s3 back up repository
s3client.store(data=clean_data, bucket=cons.s3_bucket, key=f"{cons.s3_clean_directory}/{fname}")
clean_data_shape = clean_data.shape
# only rewrite file with data
if clean_data_shape[0] > 0:
logging.info(f"Clean Data Shape: {clean_data_shape}")
# write data to clean data directory
cleaned_data_fpath = os.path.join(cleaned_data_dir, f"{scraped_fname}{cleaned_fextension}")
logging.info(f"Writing cleaned data file {cleaned_data_fpath} to disk")
#clean_data.to_csv(cleaned_data_fpath, header=True, index=False)
clean_data.to_parquet(cleaned_data_fpath, index=False, schema=cons.cleaned_data_pa_schema)
if store_on_s3:
# store data on s3 back up repository
s3client.store(
data=clean_data,
bucket=cons.s3_bucket,
key=f"{cons.s3_clean_directory}/{scraped_fname}{cleaned_fextension}",
schema=cons.cleaned_data_pa_schema
)
2 changes: 1 addition & 1 deletion webscraper/utilities/gen_master_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def gen_master_data(
met_eireann_fpaths = [os.path.join(cleaned_data_dir, fname) for fname in os.listdir(cleaned_data_dir)]
logging.info("Reading and concatenating files ...")
# load and concatenate data files together
data_list = [pd.read_csv(fpath) for fpath in met_eireann_fpaths]
data_list = [pd.read_parquet(fpath) for fpath in met_eireann_fpaths]
data = pd.concat(objs=data_list, ignore_index=True, axis=0)
# convert date to datetime
data["date"] = pd.to_datetime(data["date"], format="%Y-%m-%d")
Expand Down

0 comments on commit 195f0a3

Please sign in to comment.