diff --git a/.dockerignore b/.dockerignore index 5bf512a..4591f18 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,5 +1,6 @@ data/Met_Eireann/arch data/Met_Eireann/scraped_data +data/Met_Eireann/cleaned_data *__pycache__ *.ipynb_checkpoints *.xlsx diff --git a/.gitignore b/.gitignore index e78ee81..57a5a11 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ # ignore sub repos data/Met_Eireann/arch data/Met_Eireann/scraped_data +data/Met_Eireann/cleaned_data *__pycache__ *.ipynb_checkpoints *.xlsx diff --git a/data/master.feather b/data/master.feather index 8a15ed4..0c403c9 100644 Binary files a/data/master.feather and b/data/master.feather differ diff --git a/exeWebscrapeData.bat b/exeWebscrapeData.bat index 4e1eb1e..d51547f 100644 --- a/exeWebscrapeData.bat +++ b/exeWebscrapeData.bat @@ -1 +1 @@ -call python webscraper\prg_webscrape_data.py --retrieve_data --generate_master_data --generate_preaggregated_data --generate_counties_data --generate_stations_data \ No newline at end of file +call python webscraper\prg_webscrape_data.py --run_met_data --run_clean_data --run_master_data --run_preagg_data --run_map_data --run_points_data \ No newline at end of file diff --git a/webscraper/arch/gen_boto3_excel.py b/webscraper/arch/gen_boto3_excel.py deleted file mode 100644 index 3c5b74d..0000000 --- a/webscraper/arch/gen_boto3_excel.py +++ /dev/null @@ -1,55 +0,0 @@ -import io -import boto3 -import json -from beartype import beartype - -@beartype -def gen_boto3_excel( - sessionToken:str, - bucket:str="irishclimateapp", - prefix:str="data/Met_Eireann" - ) -> list: - """Retrieves the raw Met Eireann data from AWS s3 - - Parameters - ---------- - sessionToken : str - The file path to an active aws session token - bucket : str - The s3 bucket containing the Met Eireann data files - prefix : str - The s3 directory containing the Met Eireann data files - - Returns - ------- - list - The raw Met Eireann data - """ - # load aws config - with open(sessionToken, "r") as j: - aws_config = json.loads(j.read()) - # connect to aws boto3 - session = boto3.Session( - aws_access_key_id=aws_config['Credentials']["AccessKeyId"], - aws_secret_access_key=aws_config['Credentials']["SecretAccessKey"], - aws_session_token=aws_config['Credentials']["SessionToken"], - region_name="eu-west-1" - ) - # generate boto3 s3 connection - client = session.client("s3") - # create a paginator to list all objects - paginator = client.get_paginator("list_objects_v2") - # apply the paginator to list all files in the irishclimateapp bucket with key data/Met_Eireann - operation_parameters = {"Bucket": bucket, "Prefix": prefix} - page_iterator = paginator.paginate(**operation_parameters) - # filter down contents keys with .xlsx - filtered_iterator = page_iterator.search("Contents[?contains(Key,'.xlsx')].Key") - # extract out the file keys - file_keys = [content_key for content_key in filtered_iterator] - # load s3 objects into list - objs_list = [ - client.get_object(Bucket=bucket, Key=file_key) for file_key in file_keys - ] - # decode xlsx files in body - data_list = [io.BytesIO(obj["Body"].read()) for obj in objs_list] - return data_list diff --git a/webscraper/cons.py b/webscraper/cons.py index 546bdfb..dc5e3e9 100644 --- a/webscraper/cons.py +++ b/webscraper/cons.py @@ -10,6 +10,7 @@ sys.path.append(root_dir) # set directories data_dir = os.path.join(root_dir, 'data') +creds_data = os.path.join(root_dir, '.creds') gis_dir = os.path.join(data_dir, "gis") met_eireann_dir = os.path.join(data_dir, 'Met_Eireann') bokeh_ref_data_dir = os.path.join(data_dir, "bokeh", "ref") @@ -23,11 +24,13 @@ map_data_fpath = os.path.join(gis_dir, "map_data.pickle") points_data_fpath = os.path.join(gis_dir, "points_data.pickle") scraped_data_dir = os.path.join(met_eireann_dir, 'scraped_data') +cleaned_data_dir = os.path.join(met_eireann_dir, 'cleaned_data') stations_fpath = os.path.join(met_eireann_dir, 'ref', 'StationDetails.csv') unittest_normal_dists_fpath = os.path.join(bokeh_ref_data_dir, "unittest_normal_dists.json") col_options_fpath = os.path.join(bokeh_ref_data_dir, "col_options.json") stat_options_fpath = os.path.join(bokeh_ref_data_dir, "stat_options.json") agg_level_strftime_fpath = os.path.join(bokeh_ref_data_dir, "agg_level_strftime.json") +session_token_fpath = os.path.join(creds_data, "sessionToken.json") # load bokeh reference data with open(col_options_fpath) as json_file: @@ -35,4 +38,10 @@ with open(stat_options_fpath) as json_file: stat_options = json.load(json_file) with open(agg_level_strftime_fpath) as json_file: - date_strftime_dict = json.load(json_file) \ No newline at end of file + date_strftime_dict = json.load(json_file) + +# aws s3 constants +s3_bucket = "irishclimatedashboard" +s3_scraped_directory = "data/Met_Eireann/scraped_data" +s3_clean_directory = "data/Met_Eireann/cleaned_data" +s3_fname = "dly{station_id}.csv" \ No newline at end of file diff --git a/webscraper/prg_webscrape_data.py b/webscraper/prg_webscrape_data.py index a5c9848..35703e2 100644 --- a/webscraper/prg_webscrape_data.py +++ b/webscraper/prg_webscrape_data.py @@ -3,34 +3,37 @@ import time from beartype import beartype from utilities.commandline_interface import commandline_interface -from utilities.load_stations_data import load_stations_data -from utilities.retrieve_station_data import retrieve_station_data +from utilities.gen_met_data import gen_met_data +from utilities.gen_clean_data import gen_clean_data from utilities.gen_master_data import gen_master_data -from utilities.gen_preaggregate_data import gen_preaggregate_data -from utilities.gen_counties_data import gen_counties_data -from utilities.gen_stations_data import gen_stations_data +from utilities.gen_preagg_data import gen_preagg_data +from utilities.gen_map_data import gen_map_data +from utilities.gen_points_data import gen_points_data @beartype def webscrape_data( - retrieve_data:bool, - generate_master_data:bool, - generate_preaggregated_data:bool, - generate_counties_data:bool, - generate_stations_data:bool + run_met_data:bool, + run_clean_data:bool, + run_master_data:bool, + run_preagg_data:bool, + run_map_data:bool, + run_points_data:bool ): """Webscrape and process met data into dashboard files Parameters ---------- - retrieve_data : bool + run_met_data : bool Retrieves / web scrapes the historical met data - generate_master_data : bool + run_clean_data : bool + Cleans and processes the scraped met data + run_master_data : bool Generates the master data file from the retrieved / web scraped met data files - generate_preaggregated_data : bool + run_preagg_data : bool Preaggreates the master data file into various date levels for the bokeh dashboard app - generate_counties_data : bool - Generates the counties gis file for the bokeh dashboard app - generate_stations_data : bool + run_map_data : bool + Generates the map gis file for the bokeh dashboard app + run_points_data : bool Generates the stations gis file for the bokeh dashboard app Returns @@ -38,28 +41,61 @@ def webscrape_data( """ # start timer t0 = time.time() - if retrieve_data: + + if run_met_data: logging.info('~~~~~ Retrieving data for met stations ...') - # load stations data - stations = load_stations_data(stations_fpath=cons.stations_fpath, filter_open=True) # run webscraper - resp_log = retrieve_station_data(stations=stations, scraped_data_dir=cons.scraped_data_dir, data_level="dly") - if generate_master_data: + gen_met_data( + stations_fpath=cons.stations_fpath, + filter_open=True, + topn_stations=5, + scraped_data_dir=cons.scraped_data_dir, data_level="dly" + ) + + if run_clean_data: + logging.info('~~~~~ Cleaning met stations data ...') + # run data cleaning + gen_clean_data( + scraped_data_dir=cons.scraped_data_dir, + cleaned_data_dir=cons.cleaned_data_dir, + store_on_s3=False + ) + + if run_master_data: logging.info('~~~~~ Generating master data file ...') # generate master data file - gen_master_data(master_data_fpath = cons.master_data_fpath) - if generate_preaggregated_data: + gen_master_data( + cleaned_data_dir=cons.cleaned_data_dir, + master_data_fpath=cons.master_data_fpath + ) + + if run_preagg_data: logging.info('~~~~~ Generating preaggregated data file ...') # generate the preaggregate data - gen_preaggregate_data(preaggregate_data_fpath = cons.preaggregate_data_fpath) - if generate_counties_data: - logging.info('~~~~~ Generating geospatial counties data file ...') + gen_preagg_data( + master_data_fpath=cons.master_data_fpath, + preaggregate_data_fpath=cons.preaggregate_data_fpath + ) + + if run_map_data: + logging.info('~~~~~ Generating geospatial map data file ...') # generate counties data - gen_counties_data(map_data_fpath = cons.map_data_fpath) - if generate_stations_data: - logging.info('~~~~~ Generating geospatial stations data file ...') + gen_map_data( + rep_counties_fpath=cons.rep_counties_fpath, + ni_counties_fpath=cons.ni_counties_fpath, + preaggregate_data_fpath=cons.preaggregate_data_fpath, + map_data_fpath=cons.map_data_fpath + ) + + if run_points_data: + logging.info('~~~~~ Generating geospatial points data file ...') # generate wheather station points data - gen_stations_data(points_data_fpath = cons.points_data_fpath) + gen_points_data( + master_data_fpath=cons.master_data_fpath, + stations_fpath=cons.stations_fpath, + points_data_fpath=cons.points_data_fpath + ) + # end timer and log result t1 = time.time() tres = t1 - t0 @@ -71,13 +107,16 @@ def webscrape_data( # set up logging lgr = logging.getLogger() lgr.setLevel(logging.INFO) + # handle input parameters input_params_dict = commandline_interface() + # call webscrape data webscrape_data( - retrieve_data=input_params_dict['retrieve_data'], - generate_master_data=input_params_dict['generate_master_data'], - generate_preaggregated_data=input_params_dict['generate_preaggregated_data'], - generate_counties_data=input_params_dict['generate_counties_data'], - generate_stations_data=input_params_dict['generate_stations_data'] + run_met_data=input_params_dict['run_met_data'], + run_clean_data=input_params_dict['run_clean_data'], + run_master_data=input_params_dict['run_master_data'], + run_preagg_data=input_params_dict['run_preagg_data'], + run_map_data=input_params_dict['run_map_data'], + run_points_data=input_params_dict['run_points_data'] ) \ No newline at end of file diff --git a/webscraper/utilities/S3Client.py b/webscraper/utilities/S3Client.py new file mode 100644 index 0000000..5202865 --- /dev/null +++ b/webscraper/utilities/S3Client.py @@ -0,0 +1,84 @@ +import io +import boto3 +import json +import logging +import pandas as pd +from typing import Union +from beartype import beartype + +class S3Client(): + + @beartype + def __init__(self, sessionToken:str): + # load aws config + with open(sessionToken, "r") as j: + aws_config = json.loads(j.read()) + # connect to aws boto3 + self.session = boto3.Session( + aws_access_key_id=aws_config['Credentials']["AccessKeyId"], + aws_secret_access_key=aws_config['Credentials']["SecretAccessKey"], + aws_session_token=aws_config['Credentials']["SessionToken"], + region_name="eu-west-1" + ) + # generate boto3 s3 connection + self.client = self.session.client("s3") + + @beartype + def store( + self, + data:pd.DataFrame, + key:str, + bucket:str="irishclimateapp" + ): + """Stores a raw Met Eireann data file on s3. + + Parameters + ---------- + directory : str + The s3 key to store the Met Eireann data files + bucket : str + The s3 bucket storing the Met Eireann data files + + Returns + ------- + """ + try: + logging.info(f"Storing data to S3://{bucket}/{key}") + csv_buf = io.StringIO() + data.to_csv(csv_buf, header=True, index=False) + csv_buf.seek(0) + self.client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=key) + except Exception as e: + logging.info(str(e)) + + @beartype + def retrieve( + self, + key:str, + bucket:str="irishclimateapp" + ): + + """Retrieves a raw Met Eireann data from AWS s3. + + Parameters + ---------- + key : str + The s3 key containing the Met Eireann data file + bucket : str + The s3 bucket containing the Met Eireann data file + + Returns + ------- + + The raw Met Eireann data + """ + data = None + try: + logging.info(f"Retrieving data from S3://{bucket}/{key}") + # load s3 objects into list + obj = self.client.get_object(Bucket=bucket, Key=key) + # decode xlsx files in body + data = pd.read_csv(obj["Body"]) + except Exception as e: + logging.info(str(e)) + return data diff --git a/webscraper/utilities/commandline_interface.py b/webscraper/utilities/commandline_interface.py index b9efe59..f1cc605 100644 --- a/webscraper/utilities/commandline_interface.py +++ b/webscraper/utilities/commandline_interface.py @@ -4,12 +4,6 @@ def commandline_interface(): """A commandline interface for parsing input parameters with - Windows - python IrishClimateDashboard\\webscraper\\prg_webscraper_data.py --retrieve_data --generate_master_data --generate_preaggregated_data --generate_counties_data --generate_stations_data - - Linux - python3 IrishClimateDashboard/webscraper/prg_webscraper_data.py --retrieve_data --generate_master_data --generate_preaggregated_data --generate_counties_data --generate_stations_data - Parameters ---------- @@ -21,19 +15,21 @@ def commandline_interface(): # define argument parser object parser = argparse.ArgumentParser(description="Execute Random TeleCom Data Programme.") # add input arguments - parser.add_argument("--retrieve_data", action=argparse.BooleanOptionalAction, dest="retrieve_data", type=bool, default=False, help="Boolean, retrieves / web scrapes the historical met data",) - parser.add_argument("--generate_master_data", action=argparse.BooleanOptionalAction, dest="generate_master_data", type=bool, default=False, help="Boolean, generates the master data file from the retrieved / web scraped met data files",) - parser.add_argument("--generate_preaggregated_data", action=argparse.BooleanOptionalAction, dest="generate_preaggregated_data", type=bool, default=False, help="Boolean, preaggreates the master data file into various date levels for the bokeh dashboard app",) - parser.add_argument("--generate_counties_data", action=argparse.BooleanOptionalAction, dest="generate_counties_data", type=bool, default=False, help="Boolean, generates the counties gis file for the bokeh dashboard app",) - parser.add_argument("--generate_stations_data", action=argparse.BooleanOptionalAction, dest="generate_stations_data", type=bool, default=False, help="Boolean, generates the stations gis file for the bokeh dashboard app",) + parser.add_argument("--run_met_data", action=argparse.BooleanOptionalAction, dest="run_met_data", type=bool, default=False, help="Boolean, retrieves / web scrapes the historical met data",) + parser.add_argument("--run_clean_data", action=argparse.BooleanOptionalAction, dest="run_clean_data", type=bool, default=False, help="Boolean, cleans and processes the scraped met data",) + parser.add_argument("--run_master_data", action=argparse.BooleanOptionalAction, dest="run_master_data", type=bool, default=False, help="Boolean, generates the master data file from the retrieved / web scraped met data files",) + parser.add_argument("--run_preagg_data", action=argparse.BooleanOptionalAction, dest="run_preagg_data", type=bool, default=False, help="Boolean, preaggreates the master data file into various date levels for the bokeh dashboard app",) + parser.add_argument("--run_map_data", action=argparse.BooleanOptionalAction, dest="run_map_data", type=bool, default=False, help="Boolean, generates the map gis file for the bokeh dashboard app",) + parser.add_argument("--run_points_data", action=argparse.BooleanOptionalAction, dest="run_points_data", type=bool, default=False, help="Boolean, generates the stations gis file for the bokeh dashboard app",) # create an output dictionary to hold the results input_params_dict = {} # extract input arguments args = parser.parse_args() # map input arguments into output dictionary - input_params_dict["retrieve_data"] = args.retrieve_data - input_params_dict["generate_master_data"] = args.generate_master_data - input_params_dict["generate_preaggregated_data"] = args.generate_preaggregated_data - input_params_dict["generate_counties_data"] = args.generate_counties_data - input_params_dict["generate_stations_data"] = args.generate_stations_data + input_params_dict["run_met_data"] = args.run_met_data + input_params_dict["run_clean_data"] = args.run_clean_data + input_params_dict["run_master_data"] = args.run_master_data + input_params_dict["run_preagg_data"] = args.run_preagg_data + input_params_dict["run_map_data"] = args.run_map_data + input_params_dict["run_points_data"] = args.run_points_data return input_params_dict diff --git a/webscraper/utilities/load_data.py b/webscraper/utilities/gen_clean_data.py similarity index 56% rename from webscraper/utilities/load_data.py rename to webscraper/utilities/gen_clean_data.py index 0b84b68..0a9296a 100644 --- a/webscraper/utilities/load_data.py +++ b/webscraper/utilities/gen_clean_data.py @@ -1,9 +1,10 @@ - -import re +import logging import os +import re import pandas as pd -import cons from beartype import beartype +import cons +from utilities.S3Client import S3Client @beartype def load_data( @@ -17,8 +18,7 @@ def load_data( fpath : str The file path to load the webscraped met data from disk stations_fpath : str - The file path to load the reference station data from disk - + The file path to load the reference station data from disk, default is cons.stations_fpath Returns ------- @@ -56,3 +56,41 @@ def load_data( dataframe["county"] = dataframe["county"].str.title() dataframe["date"] = pd.to_datetime(dataframe["date"], format='%d-%b-%Y') return dataframe + + +@beartype +def gen_clean_data( + scraped_data_dir:str=cons.scraped_data_dir, + cleaned_data_dir:str=cons.cleaned_data_dir, + store_on_s3:bool=False + ): + """Generates the master data from the individual raw Met Eireann .xlsx files + + Parameters + ---------- + scraped_data_dir : str + The local directory to load the raw Met Eireann .csv files from, default is cons.scraped_data_dir + cleaned_data_dir : str + The local directory to write the cleaned Met Eireann .csv files to, default is cons.cleaned_data_dir + store_on_s3 : bool + Whether to back up the clean data files on s3, default is False + + Returns + ------- + """ + # load data files from file directory + scraped_data_fpaths = [os.path.join(scraped_data_dir, fname) for fname in os.listdir(scraped_data_dir)] + logging.info("Reading, cleaning and storing files ...") + s3client = S3Client(sessionToken=cons.session_token_fpath) + for fpath in scraped_data_fpaths: + # extract basename + fname = os.path.basename(fpath) + # load data + clean_data = load_data(fpath) + # write data to clean data directory + cleaned_data_fpath = os.path.join(cleaned_data_dir, fname) + logging.info(f"Writing cleaned data file {cleaned_data_fpath} to disk") + clean_data.to_csv(cleaned_data_fpath, header=True, index=False) + if store_on_s3: + # store data on s3 back up repository + s3client.store(data=clean_data, bucket=cons.s3_bucket, key=f"{cons.s3_clean_directory}/{fname}") \ No newline at end of file diff --git a/webscraper/utilities/gen_counties_data.py b/webscraper/utilities/gen_map_data.py similarity index 60% rename from webscraper/utilities/gen_counties_data.py rename to webscraper/utilities/gen_map_data.py index 631acc4..e7bc019 100644 --- a/webscraper/utilities/gen_counties_data.py +++ b/webscraper/utilities/gen_map_data.py @@ -8,31 +8,36 @@ from typing import Union @beartype -def gen_counties_data( - pre_agg_data_dict:Union[dict,None]=None, - map_data_fpath:Union[str,None]=None, +def gen_map_data( + rep_counties_fpath:str=cons.rep_counties_fpath, + ni_counties_fpath:str=cons.ni_counties_fpath, + preaggregate_data_fpath:str=cons.preaggregate_data_fpath, + map_data_fpath:str=cons.map_data_fpath ): """Generates counties map data for the bokeh map dashboard Parameters ---------- - pre_agg_data_dict : None or dict - Either the preaggregated data dictionary or loads the preaggregated data dictionary from disk when None, default is None - map_data_fpath : None or str - The file location to write the map data to disk, default is None + rep_counties_fpath : str + The file path to the republic of ireland counties .shp file on disk, default is cons.rep_counties_fpath, + ni_counties_fpath : str + The file path to northern irleand counties .shp file on disk, default is cons.ni_counties_fpath + pre_agg_data_dict : str + The file path to the preaggregated data on disk, default is cons.preaggregate_data_fpath + map_data_fpath : str + The file location to write the map data to disk, default is map_data_fpath Returns ------- """ logging.info("Loading rep / ni counties shape files ...") # load in county shape files - rep_counties = (gpd.read_file(cons.rep_counties_fpath)[["ENGLISH", "geometry"]].rename(columns={"ENGLISH": "county"}).to_crs(epsg=2157)) - ni_counties = gpd.read_file(cons.ni_counties_fpath)[["county", "geometry"]].to_crs(epsg=2157) - if type(pre_agg_data_dict) == type(None): - logging.info("Loading preaggregated data dictionary ...") - # load preaggregated data - with open(cons.preaggregate_data_fpath, "rb") as f: - pre_agg_data_dict = pickle.load(f) + rep_counties = (gpd.read_file(rep_counties_fpath)[["ENGLISH", "geometry"]].rename(columns={"ENGLISH": "county"}).to_crs(epsg=2157)) + ni_counties = gpd.read_file(ni_counties_fpath)[["county", "geometry"]].to_crs(epsg=2157) + logging.info("Loading preaggregated data dictionary ...") + # load preaggregated data + with open(preaggregate_data_fpath, "rb") as f: + pre_agg_data_dict = pickle.load(f) logging.info("Concatenating counties geopandas dataframes ...") # concatenate county shape files counties = gpd.GeoDataFrame(pd.concat([rep_counties, ni_counties], ignore_index=True), crs="EPSG:2157") @@ -61,19 +66,16 @@ def gen_counties_data( county_data = pre_agg_data.groupby(group_cols, as_index=False).agg(agg_dict) county_data['stat'] = stat map_data_list.append(county_data) - # map_data = pd.concat(objs=map_data_list,axis=0,ignore_index=True) # join county level data to map data map_geodata = gpd.GeoDataFrame( data=pd.merge(left=counties, right=map_data, on="county", how="left"), crs="EPSG:2157", ) - # if the output - if map_data_fpath != None: - if os.path.exists(map_data_fpath): - logging.info("Writing counties data to disk as pickle file ...") - # pickle the preaggregated data dictionary to disk - with open(map_data_fpath, "wb") as f: - pickle.dump(map_geodata, f, protocol=pickle.HIGHEST_PROTOCOL) - else: - raise ValueError(f"{map_data_fpath} does not exist") + if os.path.exists(map_data_fpath): + logging.info("Writing counties data to disk as pickle file ...") + # pickle the preaggregated data dictionary to disk + with open(map_data_fpath, "wb") as f: + pickle.dump(map_geodata, f, protocol=pickle.HIGHEST_PROTOCOL) + else: + raise ValueError(f"{map_data_fpath} does not exist") diff --git a/webscraper/utilities/gen_master_data.py b/webscraper/utilities/gen_master_data.py index 02081a9..6caef31 100644 --- a/webscraper/utilities/gen_master_data.py +++ b/webscraper/utilities/gen_master_data.py @@ -4,41 +4,39 @@ import cons from beartype import beartype from typing import Union -from webscraper.utilities.load_data import load_data @beartype def gen_master_data( - met_eireann_fpaths:Union[list,None]=None, - master_data_fpath:Union[str,None]=None, + cleaned_data_dir:str=cons.cleaned_data_dir, + master_data_fpath:str=cons.master_data_fpath, ): """Generates the master data from the individual raw Met Eireann .xlsx files Parameters ---------- - met_eireann_fpaths : None or list - The raw Met Eireann .xlsx file paths, default is None - master_data_fpath : None or str - The file location to write the master data to disk, default is None + cleaned_data_dir : str + The raw Met Eireann .xlsx file paths, default is cons.cleaned_data_dir + master_data_fpath : str + The file location to write the master data to disk, default is cons.master_data_fpath Returns ------- """ - # if load data locally - if met_eireann_fpaths == None: - logging.info("Retrieving raw met eireann .xlsx file paths from disk ...") - # load data files from file directory - met_eireann_fpaths = [os.path.join(cons.scraped_data_dir, fname) for fname in os.listdir(cons.scraped_data_dir)] - logging.info("Reading, concatenating and cleaning .xlsx files ...") + logging.info("Retrieving cleaned file paths from disk ...") + # load data files from file directory + met_eireann_fpaths = [os.path.join(cleaned_data_dir, fname) for fname in os.listdir(cleaned_data_dir)] + logging.info("Reading and concatenating files ...") # load and concatenate data files together - data_list = [load_data(fpath) for fpath in met_eireann_fpaths] + data_list = [pd.read_csv(fpath) for fpath in met_eireann_fpaths] data = pd.concat(objs=data_list, ignore_index=True, axis=0) + # convert date to datetime + data["date"] = pd.to_datetime(data["date"], format="%Y-%m-%d") # order results by county, id and date alphabetically data = data.sort_values(by=["county", "id", "date"]).reset_index(drop=True) # if the output - if master_data_fpath != None: - if os.path.exists(master_data_fpath): - logging.info("Writing master file to disk as .feather file ...") - # save concatenated data to disk - data.to_feather(master_data_fpath) - else: - raise ValueError(f"{master_data_fpath} does not exist") \ No newline at end of file + if os.path.exists(master_data_fpath): + logging.info("Writing master file to disk as .feather file ...") + # save concatenated data to disk + data.to_feather(master_data_fpath) + else: + raise ValueError(f"{master_data_fpath} does not exist") \ No newline at end of file diff --git a/webscraper/utilities/gen_met_data.py b/webscraper/utilities/gen_met_data.py new file mode 100644 index 0000000..a0c8fa0 --- /dev/null +++ b/webscraper/utilities/gen_met_data.py @@ -0,0 +1,82 @@ +import logging +import os +import pandas as pd +import urllib.request +from beartype import beartype +from typing import Union +import cons + +@beartype +def url_retrieve( + stationid:int, + scraped_data_dir:str=cons.scraped_data_dir, + data_level:str="dly" + ): + """Retrieves met data for a given station id + + Parameters + ---------- + stationid : int + The station id to retrieve data for + scraped_data_dir : str + The file directory to write the scraped met data to, default is cons.scraped_data_dir + data_level : str + The time level of the met data to scrape, default is "dly" + + Returns + ------- + urllib.request.urlretrieve, Exception + A retrieval response + """ + data_fname = f"{data_level}{stationid}.csv" + data_url = f"http://cli.fusio.net/cli/climate_data/webdata/{data_fname}" + download_data_fpath = os.path.join(scraped_data_dir, data_fname) + try: + resp = urllib.request.urlretrieve(data_url, download_data_fpath) + except Exception as e: + resp = e + return resp + +@beartype +def gen_met_data( + stations_fpath:str=cons.stations_fpath, + filter_open:bool=True, + topn_stations:Union[int, None]=None, + scraped_data_dir:str=cons.scraped_data_dir, + data_level:str="dly" + ): + """Webscrapes the met data for all station ids in a given stations dataframe + + Parameters + ---------- + stations_fpath : pd.DataFrame + The file path to the met eireann stations reference data, default is cons.stations_fpath + filter_open : bool + Whether to only filter for only open weather stations in the met eireann stations reference data, default is True + topn_stations : int + The number of stations to sample from the head of the met eireann stations reference data, default is None + scraped_data_dir : str + The file directory to write the scraped met data to, default is cons.scraped_data_dir + data_level : str + The time level of the met data to scrape, default is "dly" + + + Returns + ------- + """ + # load stations data + stations = pd.read_csv(stations_fpath) + if filter_open: + # only consider open stations for now + open_stations_filter = stations['close_year'].isnull() + stations = stations.loc[open_stations_filter, :].reset_index(drop=True) + if topn_stations != None: + stations = stations.head(topn_stations) + # iterate over each station and pull daily level data using using stationid + resp_log =[] + for idx, row in stations.iterrows(): + logging.info(f"{idx} {row['county']} {row['station_id']} {row['name']}") + resp = url_retrieve(stationid=row['station_id'], scraped_data_dir=scraped_data_dir, data_level=data_level) + logging.info(resp) + resp_log.append(resp) + \ No newline at end of file diff --git a/webscraper/utilities/gen_stations_data.py b/webscraper/utilities/gen_points_data.py similarity index 54% rename from webscraper/utilities/gen_stations_data.py rename to webscraper/utilities/gen_points_data.py index 8f1cf5b..dc4b211 100644 --- a/webscraper/utilities/gen_stations_data.py +++ b/webscraper/utilities/gen_points_data.py @@ -8,31 +8,35 @@ from typing import Union @beartype -def gen_stations_data( - points_data_fpath:Union[str,None]=None +def gen_points_data( + master_data_fpath:str=cons.master_data_fpath, + stations_fpath:str=cons.stations_fpath, + points_data_fpath:str=cons.points_data_fpath ): """Generates gis points data for Met Eireann stations Parameters ---------- + master_data_fpath : str + The file path to the master data on disk, default is cons.master_data_fpath + station_fpath : str + The file path to the stations reference data on disk, default is cons.stations_fpath points_data_fpath : str - The file location to write the gis points data to disk, default is None + The file location to write the gis points data to disk, default is cons.points_data_fpath Returns ------- """ logging.info("Loading master and stations data from disk ...") # load master and station data - master_data = pd.read_feather(cons.master_data_fpath) - stations_data = pd.read_csv(cons.stations_fpath) + master_data = pd.read_feather(master_data_fpath) + stations_data = pd.read_csv(stations_fpath) logging.info("Identifying master station ids ...") # extract out station ids from mater file master_station_ids = master_data["id"].unique() logging.info("Filtering corresponding station data ...") # filter master data with station ids - master_stations = stations_data.loc[ - stations_data["station_id"].isin(master_station_ids), : - ].copy() + master_stations = stations_data.loc[stations_data["station_id"].isin(master_station_ids), :].copy() master_stations["county"] = master_stations["county"].str.title() master_stations["name"] = master_stations["name"].str.title() logging.info("Creating geopandas DataFrame of station data ...") @@ -42,12 +46,10 @@ def gen_stations_data( geometry=gpd.points_from_xy(master_stations.longitude, master_stations.latitude), crs="EPSG:4326", ).to_crs(epsg=2157) - # if the output - if points_data_fpath != None: - if os.path.exists(points_data_fpath): - logging.info("Writing gis stations data to disk as .pickle file ...") - # pickle the gis stations data - with open(points_data_fpath, "wb") as f: - pickle.dump(geo_master_stations, f, protocol=pickle.HIGHEST_PROTOCOL) - else: - raise ValueError(f"{points_data_fpath} does not exist") + if os.path.exists(points_data_fpath): + logging.info("Writing gis stations data to disk as .pickle file ...") + # pickle the gis stations data + with open(points_data_fpath, "wb") as f: + pickle.dump(geo_master_stations, f, protocol=pickle.HIGHEST_PROTOCOL) + else: + raise ValueError(f"{points_data_fpath} does not exist") diff --git a/webscraper/utilities/gen_preaggregate_data.py b/webscraper/utilities/gen_preagg_data.py similarity index 54% rename from webscraper/utilities/gen_preaggregate_data.py rename to webscraper/utilities/gen_preagg_data.py index ef04595..5088b23 100644 --- a/webscraper/utilities/gen_preaggregate_data.py +++ b/webscraper/utilities/gen_preagg_data.py @@ -7,26 +7,25 @@ from typing import Union @beartype -def gen_preaggregate_data( - master_data:Union[pd.DataFrame,None]=None, - preaggregate_data_fpath:Union[str,None]=None +def gen_preagg_data( + master_data_fpath:str=cons.master_data_fpath, + preaggregate_data_fpath:str=cons.preaggregate_data_fpath ): """Generates preaggregate data for bokeh dashboard app Parameters ---------- - master_data : None or pd.DataFrame - Either the master data as a pandas.DataFrame or loads the master data from disk when None, default is None + master_data_fpath : None or pd.DataFrame + The file location to write the master data to disk, default is cons.master_data_fpath preaggregate_data_fpath : str - The file location to write the preaggregated data to disk, default is None + The file location to write the preaggregated data to disk, default is cons.preaggregate_data_fpath Returns ------- """ - if type(master_data) == type(None): - logging.info("Loading master data from disk ...") - # load master data - master_data = pd.read_feather(cons.master_data_fpath) + logging.info("Loading master data from disk ...") + # load master data + master_data = pd.read_feather(master_data_fpath) logging.info("Performing initial data aggregation to year-month level ...") # preaggregate the data to year-month level for each available stat pre_agg_data_dict = {} @@ -41,12 +40,10 @@ def gen_preaggregate_data( agg_dict = {col: stat for col in cons.col_options} tmp_agg_data = agg_data.groupby(group_cols, as_index=False).agg(agg_dict) pre_agg_data_dict[stat] = tmp_agg_data - # if the output - if preaggregate_data_fpath != None: - if os.path.exists(preaggregate_data_fpath): - logging.info("Writing preaggregated data to disk as .pickle file ...") - # pickle the preaggregated data dictionary to disk - with open(cons.preaggregate_data_fpath, "wb") as f: - pickle.dump(pre_agg_data_dict, f, protocol=pickle.HIGHEST_PROTOCOL) - else: - raise ValueError(f"{preaggregate_data_fpath} does not exist") + if os.path.exists(preaggregate_data_fpath): + logging.info("Writing preaggregated data to disk as .pickle file ...") + # pickle the preaggregated data dictionary to disk + with open(cons.preaggregate_data_fpath, "wb") as f: + pickle.dump(pre_agg_data_dict, f, protocol=pickle.HIGHEST_PROTOCOL) + else: + raise ValueError(f"{preaggregate_data_fpath} does not exist") diff --git a/webscraper/utilities/load_stations_data.py b/webscraper/utilities/load_stations_data.py deleted file mode 100644 index 0bf0d82..0000000 --- a/webscraper/utilities/load_stations_data.py +++ /dev/null @@ -1,35 +0,0 @@ -import pandas as pd -from beartype import beartype - -@beartype -def load_stations_data( - stations_fpath:str, - filter_open:bool=True, - topn:int=None - ) -> pd.DataFrame: - """Loads the station reference data file - - Parameters - ---------- - stations_fpath : str - The file path to load the reference station data from disk - filter_open : bool - Whether to only consider open stations and not closed stations - topn : int - The number of rows to filter from the head of the loaded stations data - - - Returns - ------- - pd.DataFrame - The loaded stations reference data - """ - # load stations data - stations = pd.read_csv(stations_fpath) - if filter_open: - # only consider open stations for now - open_stations_filter = stations['close_year'].isnull() - stations = stations.loc[open_stations_filter, :].reset_index(drop=True) - if topn != None: - stations = stations.head(topn) - return stations \ No newline at end of file diff --git a/webscraper/utilities/retrieve_station_data.py b/webscraper/utilities/retrieve_station_data.py deleted file mode 100644 index 87d2c0c..0000000 --- a/webscraper/utilities/retrieve_station_data.py +++ /dev/null @@ -1,36 +0,0 @@ -import logging -import pandas as pd -from utilities.url_retrieve import url_retrieve -from beartype import beartype - -@beartype -def retrieve_station_data( - stations:pd.DataFrame, - scraped_data_dir:str, - data_level:str="dly" - ) -> list: - """Webscrapes the met data for all station ids in a given stations dataframe - - Parameters - ---------- - stations : pd.DataFrame - The loaded reference stations data - scraped_data_dir : str - The file directory to write the scraped met data to - data_level : str - The time level of the met data to scrape, default is "dly" - - - Returns - ------- - list - A log of the webscrape responses - """ - # iterate over each station and pull daily level data using using stationid - resp_log =[] - for idx, row in stations.iterrows(): - logging.info(f"{idx} {row['county']} {row['station_id']} {row['name']}") - resp = url_retrieve(stationid=row['station_id'], scraped_data_dir=scraped_data_dir, data_level=data_level) - logging.info(resp) - resp_log.append(resp) - return resp_log \ No newline at end of file diff --git a/webscraper/utilities/url_retrieve.py b/webscraper/utilities/url_retrieve.py deleted file mode 100644 index b1de553..0000000 --- a/webscraper/utilities/url_retrieve.py +++ /dev/null @@ -1,34 +0,0 @@ -import os -import urllib.request -from beartype import beartype - -@beartype -def url_retrieve( - stationid:int, - scraped_data_dir:str, - data_level:str="dly" - ): - """Retrieves met data for a given station id - - Parameters - ---------- - stationid : int - The station id to retrieve data for - scraped_data_dir : str - The file directory to write the scraped met data to - data_level : str - The time level of the met data to scrape, default is "dly" - - Returns - ------- - urllib.request.urlretrieve, Exception - A retrieval response - """ - data_fname = f"{data_level}{stationid}.csv" - data_url = f"http://cli.fusio.net/cli/climate_data/webdata/{data_fname}" - download_data_fpath = os.path.join(scraped_data_dir, data_fname) - try: - resp = urllib.request.urlretrieve(data_url, download_data_fpath) - except Exception as e: - resp = e - return resp \ No newline at end of file