Skip to content

Commit

Permalink
Updating downloading of comstock data from s3.
Browse files Browse the repository at this point in the history
  • Loading branch information
rHorsey committed Sep 19, 2024
1 parent 79d6ec1 commit d35ae57
Showing 1 changed file with 43 additions and 20 deletions.
63 changes: 43 additions & 20 deletions postprocessing/comstockpostproc/comstock.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import glob
import json
import logging
import botocore.exceptions
import numpy as np
import pandas as pd
import polars as pl
Expand Down Expand Up @@ -116,7 +117,11 @@ def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstoc
self.unweighted_weighted_map = {}
self.dropping_columns = []
self.cached_parquet = [] # List of parquet files to reload and export
# TODO our currect credential setup aren't playing well with this approach but does with the s3 ServiceResource
# We are currently unable to list the HeadObject for automatically uploaded data
# Consider migrating all usage to s3 ServiceResource instead.
self.s3_client = boto3.client('s3', config=botocore.client.Config(max_pool_connections=50))
self.s3_resource = boto3.resource('s3')
if self.athena_table_name is not None:
self.athena_client = BuildStockQuery(workgroup='eulp',
db_name='enduse',
Expand All @@ -137,7 +142,10 @@ def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstoc
# S3 location
self.s3_inpath = None
if s3_base_dir is not None:
self.s3_inpath = f"s3://{s3_base_dir}/{self.comstock_run_name}/{self.comstock_run_name}"
if self.athena_table_name:
self.s3_inpath = f"s3://{s3_base_dir}/{self.comstock_run_name}/{self.athena_table_name}"
else:
self.s3_inpath = f"s3://{s3_base_dir}/{self.comstock_run_name}/{self.comstock_run_name}"

# Load and transform data, preserving all columns
self.download_data()
Expand Down Expand Up @@ -240,13 +248,26 @@ def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstoc
# logger.debug(c)

def download_data(self):
# Get data on the s3 resource to download data from:
if self.s3_inpath is None:
logger.info('The s3 path provided in the ComStock object initalization is invalid.')
s3_path_items = self.s3_inpath.lstrip('s3://').split('/')
bucket_name = s3_path_items[0]
prfx = '/'.join(s3_path_items[1:])

# baseline/results_up00.parquet
results_data_path = os.path.join(self.data_dir, self.results_file_name)
if not os.path.exists(results_data_path):
s3_path = f"{self.s3_inpath}/baseline/{self.results_file_name}"
logger.info(f'Downloading: {s3_path}')
data = pd.read_parquet(s3_path, engine="pyarrow")
data.to_parquet(results_data_path)
baseline_parquet_path = f"{prfx}/baseline/{self.results_file_name}"
try:
self.s3_resource.Object(bucket_name, baseline_parquet_path).load()
except botocore.exceptions.ClientError:
logger.error(f'Could not find results_up00.parquet at {baseline_parquet_path} in bucket {bucket_name}')
raise FileNotFoundError(
f'Missing results_up00.parquet file. Manually download and place at {results_data_path}'
)
logger.info(f'Downloading {baseline_parquet_path} from the {bucket_name} bucket')
self.s3_resource.Object(bucket_name, baseline_parquet_path).download_file(results_data_path)

# upgrades/upgrade=*/results_up*.parquet
if self.include_upgrades:
Expand All @@ -255,13 +276,10 @@ def download_data(self):
logger.info('The s3 path passed to the constructor is invalid, '
'cannot check for results_up**.parquet files to download')
else:
s3_path_items = self.s3_inpath.lstrip('s3://').split('/')
bucket_name = s3_path_items[0]
prfx = '/'.join(s3_path_items[1:])
prfx = f'{prfx}/upgrades'
resp = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prfx)
for obj in resp.get("Contents"):
obj_path = obj['Key']
upgrade_parquet_path = f'{prfx}/upgrades'
resp = self.s3_resource.Bucket(bucket_name).objects.filter(Prefix=upgrade_parquet_path).all()
for obj in list(resp):
obj_path = obj.key
obj_name = obj_path.split('/')[-1]
m = re.search('results_up(.*).parquet', obj_name)
if not m:
Expand All @@ -272,21 +290,26 @@ def download_data(self):
continue
results_data_path = os.path.join(self.data_dir, obj_name)
if not os.path.exists(results_data_path):
s3_path = f"s3://{bucket_name}/{obj_path}"
logger.info(f'Downloading: {s3_path}')
data = pd.read_parquet(s3_path, engine="pyarrow")
data.to_parquet(results_data_path)
logger.info(f'Downloading {obj_path} from the {bucket_name} bucket')
self.s3_resource.Object(bucket_name, obj_path).download_file(results_data_path)

# buildstock.csv
#TODO: handle the missing buildstock.csv in a more robust way
#1. check the file in the data_dir
#2. if not found, download from S3
#3. if not found in S3, raise an error

buildstock_csv_path = os.path.join(self.data_dir, self.buildstock_file_name)
if not os.path.exists(buildstock_csv_path):
raise FileNotFoundError(
f'Missing buildstock.csv file. Manually download and place in {os.path.abspath(self.data_dir)}')
s3_path = f"{self.s3_inpath}/buildstock_csv/buildstock.csv"
bldstk_s3_path = f'{prfx}/buildstock_csv/buildstock.csv'
try:
self.s3_resource.Object(bucket_name, bldstk_s3_path).load()
except botocore.exceptions.ClientError:
logger.error(f'Could not find buildstock.csv at {bldstk_s3_path} in bucket {bucket_name}')
raise FileNotFoundError(
f'Missing buildstock.csv file. Manually download and place at {buildstock_csv_path}'
)
logger.info(f'Downloading {bldstk_s3_path} from the {bucket_name} bucket')
self.s3_resource.Object(bucket_name, bldstk_s3_path).download_file(buildstock_csv_path)

# EJSCREEN
ejscreen_data_path = os.path.join(self.truth_data_dir, self.ejscreen_file_name)
Expand Down

0 comments on commit d35ae57

Please sign in to comment.