Skip to content

27 boto3 s3 client #29

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
data/Met_Eireann/arch
data/Met_Eireann/scraped_data
data/Met_Eireann/cleaned_data
*__pycache__
*.ipynb_checkpoints
*.xlsx
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# ignore sub repos
data/Met_Eireann/arch
data/Met_Eireann/scraped_data
data/Met_Eireann/cleaned_data
*__pycache__
*.ipynb_checkpoints
*.xlsx
Expand Down
Binary file modified data/master.feather
Binary file not shown.
2 changes: 1 addition & 1 deletion exeWebscrapeData.bat
Original file line number Diff line number Diff line change
@@ -1 +1 @@
call python webscraper\prg_webscrape_data.py --retrieve_data --generate_master_data --generate_preaggregated_data --generate_counties_data --generate_stations_data
call python webscraper\prg_webscrape_data.py --run_met_data --run_clean_data --run_master_data --run_preagg_data --run_map_data --run_points_data
55 changes: 0 additions & 55 deletions webscraper/arch/gen_boto3_excel.py

This file was deleted.

11 changes: 10 additions & 1 deletion webscraper/cons.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
sys.path.append(root_dir)
# set directories
data_dir = os.path.join(root_dir, 'data')
creds_data = os.path.join(root_dir, '.creds')
gis_dir = os.path.join(data_dir, "gis")
met_eireann_dir = os.path.join(data_dir, 'Met_Eireann')
bokeh_ref_data_dir = os.path.join(data_dir, "bokeh", "ref")
Expand All @@ -23,16 +24,24 @@
map_data_fpath = os.path.join(gis_dir, "map_data.pickle")
points_data_fpath = os.path.join(gis_dir, "points_data.pickle")
scraped_data_dir = os.path.join(met_eireann_dir, 'scraped_data')
cleaned_data_dir = os.path.join(met_eireann_dir, 'cleaned_data')
stations_fpath = os.path.join(met_eireann_dir, 'ref', 'StationDetails.csv')
unittest_normal_dists_fpath = os.path.join(bokeh_ref_data_dir, "unittest_normal_dists.json")
col_options_fpath = os.path.join(bokeh_ref_data_dir, "col_options.json")
stat_options_fpath = os.path.join(bokeh_ref_data_dir, "stat_options.json")
agg_level_strftime_fpath = os.path.join(bokeh_ref_data_dir, "agg_level_strftime.json")
session_token_fpath = os.path.join(creds_data, "sessionToken.json")

# load bokeh reference data
with open(col_options_fpath) as json_file:
col_options = json.load(json_file)
with open(stat_options_fpath) as json_file:
stat_options = json.load(json_file)
with open(agg_level_strftime_fpath) as json_file:
date_strftime_dict = json.load(json_file)
date_strftime_dict = json.load(json_file)

# aws s3 constants
s3_bucket = "irishclimatedashboard"
s3_scraped_directory = "data/Met_Eireann/scraped_data"
s3_clean_directory = "data/Met_Eireann/cleaned_data"
s3_fname = "dly{station_id}.csv"
109 changes: 74 additions & 35 deletions webscraper/prg_webscrape_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,63 +3,99 @@
import time
from beartype import beartype
from utilities.commandline_interface import commandline_interface
from utilities.load_stations_data import load_stations_data
from utilities.retrieve_station_data import retrieve_station_data
from utilities.gen_met_data import gen_met_data
from utilities.gen_clean_data import gen_clean_data
from utilities.gen_master_data import gen_master_data
from utilities.gen_preaggregate_data import gen_preaggregate_data
from utilities.gen_counties_data import gen_counties_data
from utilities.gen_stations_data import gen_stations_data
from utilities.gen_preagg_data import gen_preagg_data
from utilities.gen_map_data import gen_map_data
from utilities.gen_points_data import gen_points_data

@beartype
def webscrape_data(
retrieve_data:bool,
generate_master_data:bool,
generate_preaggregated_data:bool,
generate_counties_data:bool,
generate_stations_data:bool
run_met_data:bool,
run_clean_data:bool,
run_master_data:bool,
run_preagg_data:bool,
run_map_data:bool,
run_points_data:bool
):
"""Webscrape and process met data into dashboard files

Parameters
----------
retrieve_data : bool
run_met_data : bool
Retrieves / web scrapes the historical met data
generate_master_data : bool
run_clean_data : bool
Cleans and processes the scraped met data
run_master_data : bool
Generates the master data file from the retrieved / web scraped met data files
generate_preaggregated_data : bool
run_preagg_data : bool
Preaggreates the master data file into various date levels for the bokeh dashboard app
generate_counties_data : bool
Generates the counties gis file for the bokeh dashboard app
generate_stations_data : bool
run_map_data : bool
Generates the map gis file for the bokeh dashboard app
run_points_data : bool
Generates the stations gis file for the bokeh dashboard app

Returns
-------
"""
# start timer
t0 = time.time()
if retrieve_data:

if run_met_data:
logging.info('~~~~~ Retrieving data for met stations ...')
# load stations data
stations = load_stations_data(stations_fpath=cons.stations_fpath, filter_open=True)
# run webscraper
resp_log = retrieve_station_data(stations=stations, scraped_data_dir=cons.scraped_data_dir, data_level="dly")
if generate_master_data:
gen_met_data(
stations_fpath=cons.stations_fpath,
filter_open=True,
topn_stations=5,
scraped_data_dir=cons.scraped_data_dir, data_level="dly"
)

if run_clean_data:
logging.info('~~~~~ Cleaning met stations data ...')
# run data cleaning
gen_clean_data(
scraped_data_dir=cons.scraped_data_dir,
cleaned_data_dir=cons.cleaned_data_dir,
store_on_s3=False
)

if run_master_data:
logging.info('~~~~~ Generating master data file ...')
# generate master data file
gen_master_data(master_data_fpath = cons.master_data_fpath)
if generate_preaggregated_data:
gen_master_data(
cleaned_data_dir=cons.cleaned_data_dir,
master_data_fpath=cons.master_data_fpath
)

if run_preagg_data:
logging.info('~~~~~ Generating preaggregated data file ...')
# generate the preaggregate data
gen_preaggregate_data(preaggregate_data_fpath = cons.preaggregate_data_fpath)
if generate_counties_data:
logging.info('~~~~~ Generating geospatial counties data file ...')
gen_preagg_data(
master_data_fpath=cons.master_data_fpath,
preaggregate_data_fpath=cons.preaggregate_data_fpath
)

if run_map_data:
logging.info('~~~~~ Generating geospatial map data file ...')
# generate counties data
gen_counties_data(map_data_fpath = cons.map_data_fpath)
if generate_stations_data:
logging.info('~~~~~ Generating geospatial stations data file ...')
gen_map_data(
rep_counties_fpath=cons.rep_counties_fpath,
ni_counties_fpath=cons.ni_counties_fpath,
preaggregate_data_fpath=cons.preaggregate_data_fpath,
map_data_fpath=cons.map_data_fpath
)

if run_points_data:
logging.info('~~~~~ Generating geospatial points data file ...')
# generate wheather station points data
gen_stations_data(points_data_fpath = cons.points_data_fpath)
gen_points_data(
master_data_fpath=cons.master_data_fpath,
stations_fpath=cons.stations_fpath,
points_data_fpath=cons.points_data_fpath
)

# end timer and log result
t1 = time.time()
tres = t1 - t0
Expand All @@ -71,13 +107,16 @@ def webscrape_data(
# set up logging
lgr = logging.getLogger()
lgr.setLevel(logging.INFO)

# handle input parameters
input_params_dict = commandline_interface()

# call webscrape data
webscrape_data(
retrieve_data=input_params_dict['retrieve_data'],
generate_master_data=input_params_dict['generate_master_data'],
generate_preaggregated_data=input_params_dict['generate_preaggregated_data'],
generate_counties_data=input_params_dict['generate_counties_data'],
generate_stations_data=input_params_dict['generate_stations_data']
run_met_data=input_params_dict['run_met_data'],
run_clean_data=input_params_dict['run_clean_data'],
run_master_data=input_params_dict['run_master_data'],
run_preagg_data=input_params_dict['run_preagg_data'],
run_map_data=input_params_dict['run_map_data'],
run_points_data=input_params_dict['run_points_data']
)
84 changes: 84 additions & 0 deletions webscraper/utilities/S3Client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import io
import boto3
import json
import logging
import pandas as pd
from typing import Union
from beartype import beartype

class S3Client():

@beartype
def __init__(self, sessionToken:str):
# load aws config
with open(sessionToken, "r") as j:
aws_config = json.loads(j.read())
# connect to aws boto3
self.session = boto3.Session(
aws_access_key_id=aws_config['Credentials']["AccessKeyId"],
aws_secret_access_key=aws_config['Credentials']["SecretAccessKey"],
aws_session_token=aws_config['Credentials']["SessionToken"],
region_name="eu-west-1"
)
# generate boto3 s3 connection
self.client = self.session.client("s3")

@beartype
def store(
self,
data:pd.DataFrame,
key:str,
bucket:str="irishclimateapp"
):
"""Stores a raw Met Eireann data file on s3.

Parameters
----------
directory : str
The s3 key to store the Met Eireann data files
bucket : str
The s3 bucket storing the Met Eireann data files

Returns
-------
"""
try:
logging.info(f"Storing data to S3://{bucket}/{key}")
csv_buf = io.StringIO()
data.to_csv(csv_buf, header=True, index=False)
csv_buf.seek(0)
self.client.put_object(Bucket=bucket, Body=csv_buf.getvalue(), Key=key)
except Exception as e:
logging.info(str(e))

@beartype
def retrieve(
self,
key:str,
bucket:str="irishclimateapp"
):

"""Retrieves a raw Met Eireann data from AWS s3.

Parameters
----------
key : str
The s3 key containing the Met Eireann data file
bucket : str
The s3 bucket containing the Met Eireann data file

Returns
-------

The raw Met Eireann data
"""
data = None
try:
logging.info(f"Retrieving data from S3://{bucket}/{key}")
# load s3 objects into list
obj = self.client.get_object(Bucket=bucket, Key=key)
# decode xlsx files in body
data = pd.read_csv(obj["Body"])
except Exception as e:
logging.info(str(e))
return data
28 changes: 12 additions & 16 deletions webscraper/utilities/commandline_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,6 @@
def commandline_interface():
"""A commandline interface for parsing input parameters with

Windows
python IrishClimateDashboard\\webscraper\\prg_webscraper_data.py --retrieve_data --generate_master_data --generate_preaggregated_data --generate_counties_data --generate_stations_data

Linux
python3 IrishClimateDashboard/webscraper/prg_webscraper_data.py --retrieve_data --generate_master_data --generate_preaggregated_data --generate_counties_data --generate_stations_data

Parameters
----------

Expand All @@ -21,19 +15,21 @@ def commandline_interface():
# define argument parser object
parser = argparse.ArgumentParser(description="Execute Random TeleCom Data Programme.")
# add input arguments
parser.add_argument("--retrieve_data", action=argparse.BooleanOptionalAction, dest="retrieve_data", type=bool, default=False, help="Boolean, retrieves / web scrapes the historical met data",)
parser.add_argument("--generate_master_data", action=argparse.BooleanOptionalAction, dest="generate_master_data", type=bool, default=False, help="Boolean, generates the master data file from the retrieved / web scraped met data files",)
parser.add_argument("--generate_preaggregated_data", action=argparse.BooleanOptionalAction, dest="generate_preaggregated_data", type=bool, default=False, help="Boolean, preaggreates the master data file into various date levels for the bokeh dashboard app",)
parser.add_argument("--generate_counties_data", action=argparse.BooleanOptionalAction, dest="generate_counties_data", type=bool, default=False, help="Boolean, generates the counties gis file for the bokeh dashboard app",)
parser.add_argument("--generate_stations_data", action=argparse.BooleanOptionalAction, dest="generate_stations_data", type=bool, default=False, help="Boolean, generates the stations gis file for the bokeh dashboard app",)
parser.add_argument("--run_met_data", action=argparse.BooleanOptionalAction, dest="run_met_data", type=bool, default=False, help="Boolean, retrieves / web scrapes the historical met data",)
parser.add_argument("--run_clean_data", action=argparse.BooleanOptionalAction, dest="run_clean_data", type=bool, default=False, help="Boolean, cleans and processes the scraped met data",)
parser.add_argument("--run_master_data", action=argparse.BooleanOptionalAction, dest="run_master_data", type=bool, default=False, help="Boolean, generates the master data file from the retrieved / web scraped met data files",)
parser.add_argument("--run_preagg_data", action=argparse.BooleanOptionalAction, dest="run_preagg_data", type=bool, default=False, help="Boolean, preaggreates the master data file into various date levels for the bokeh dashboard app",)
parser.add_argument("--run_map_data", action=argparse.BooleanOptionalAction, dest="run_map_data", type=bool, default=False, help="Boolean, generates the map gis file for the bokeh dashboard app",)
parser.add_argument("--run_points_data", action=argparse.BooleanOptionalAction, dest="run_points_data", type=bool, default=False, help="Boolean, generates the stations gis file for the bokeh dashboard app",)
# create an output dictionary to hold the results
input_params_dict = {}
# extract input arguments
args = parser.parse_args()
# map input arguments into output dictionary
input_params_dict["retrieve_data"] = args.retrieve_data
input_params_dict["generate_master_data"] = args.generate_master_data
input_params_dict["generate_preaggregated_data"] = args.generate_preaggregated_data
input_params_dict["generate_counties_data"] = args.generate_counties_data
input_params_dict["generate_stations_data"] = args.generate_stations_data
input_params_dict["run_met_data"] = args.run_met_data
input_params_dict["run_clean_data"] = args.run_clean_data
input_params_dict["run_master_data"] = args.run_master_data
input_params_dict["run_preagg_data"] = args.run_preagg_data
input_params_dict["run_map_data"] = args.run_map_data
input_params_dict["run_points_data"] = args.run_points_data
return input_params_dict
Loading
Loading