-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
85c926d
commit 6f43caf
Showing
22 changed files
with
663 additions
and
258 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
""" Adds a new metadata type, column_types """ | ||
|
||
import argparse | ||
import io | ||
import json | ||
|
||
import boto3 | ||
import pandas | ||
from rich import progress | ||
|
||
|
||
def get_csv_column_datatypes(dtypes): | ||
"""helper for generating column type for dashboard API""" | ||
column_dict = {} | ||
for column in dtypes.index: | ||
if column.endswith("year"): | ||
column_dict[column] = "year" | ||
elif column.endswith("month"): | ||
column_dict[column] = "month" | ||
elif column.endswith("week"): | ||
column_dict[column] = "week" | ||
elif column.endswith("day") or str(dtypes[column]) == "datetime64": | ||
column_dict[column] = "day" | ||
elif "cnt" in column or str(dtypes[column]) in ( | ||
"Int8", | ||
"Int16", | ||
"Int32", | ||
"Int64", | ||
"UInt8", | ||
"UInt16", | ||
"UInt32", | ||
"UInt64", | ||
): | ||
column_dict[column] = "integer" | ||
elif str(dtypes[column]) in ("Float32", "Float64"): | ||
column_dict[column] = "float" | ||
elif str(dtypes[column]) == "boolean": | ||
column_dict[column] = "float" | ||
else: | ||
column_dict[column] = "string" | ||
return column_dict | ||
|
||
|
||
def _put_s3_data(key: str, bucket_name: str, client, data: dict) -> None: | ||
"""Convenience class for writing a dict to S3""" | ||
b_data = io.BytesIO(json.dumps(data).encode()) | ||
client.upload_fileobj(Bucket=bucket_name, Key=key, Fileobj=b_data) | ||
|
||
|
||
def create_column_type_metadata(bucket: str): | ||
"""creates a new metadata dict for column types. | ||
By design, this will replaces an existing column type dict if one already exists. | ||
""" | ||
client = boto3.client("s3") | ||
res = client.list_objects_v2(Bucket=bucket, Prefix="aggregates/") | ||
contents = res["Contents"] | ||
output = {} | ||
for resource in progress.track(contents): | ||
dirs = resource["Key"].split("/") | ||
study = dirs[1] | ||
subscription = dirs[2].split("__")[1] | ||
version = dirs[3] | ||
bytes_buffer = io.BytesIO() | ||
client.download_fileobj( | ||
Bucket=bucket, Key=resource["Key"], Fileobj=bytes_buffer | ||
) | ||
df = pandas.read_parquet(bytes_buffer) | ||
type_dict = get_csv_column_datatypes(df.dtypes) | ||
filename = f"{resource['Key'].split('/')[-1].split('.')[0]}.csv" | ||
output.setdefault(study, {}) | ||
output[study].setdefault(subscription, {}) | ||
output[study][subscription].setdefault(version, {}) | ||
output[study][subscription][version]["columns"] = type_dict | ||
output[study][subscription][version]["filename"] = filename | ||
# print(json.dumps(output, indent=2)) | ||
_put_s3_data("metadata/column_types.json", bucket, client, output) | ||
|
||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser( | ||
description="""Creates column types for existing aggregates. """ | ||
) | ||
parser.add_argument("-b", "--bucket", help="bucket name") | ||
args = parser.parse_args() | ||
create_column_type_metadata(args.bucket) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
import os | ||
|
||
import boto3 | ||
|
||
from src.handlers.shared import decorators, enums, functions | ||
|
||
|
||
def _format_key( | ||
study: str, subscription: str, version: str, filename: str, site: str | None = None | ||
): | ||
"""Creates S3 key from url params""" | ||
if site: | ||
return f"last_valid/{study}/{study}__{subscription}/{site}/{version}/{filename}" | ||
return f"csv_aggregates/{study}/{study}__{subscription}/{version}/{filename}" | ||
|
||
|
||
def _get_column_types( | ||
s3_client, | ||
s3_bucket_name: str, | ||
study: str, | ||
subscription: str, | ||
version: str, | ||
**kwargs, | ||
) -> dict: | ||
"""Gets column types from the metadata store for a given subscription""" | ||
types_metadata = functions.read_metadata( | ||
s3_client, | ||
s3_bucket_name, | ||
meta_type=enums.JsonFilename.COLUMN_TYPES.value, | ||
) | ||
try: | ||
return types_metadata[study][subscription][version][ | ||
enums.ColumnTypesKeys.COLUMNS.value | ||
] | ||
except KeyError: | ||
return {} | ||
|
||
|
||
@decorators.generic_error_handler(msg="Error retrieving chart data") | ||
def get_csv_handler(event, context): | ||
"""manages event from dashboard api call and creates a temporary URL""" | ||
del context | ||
s3_bucket_name = os.environ.get("BUCKET_NAME") | ||
s3_client = boto3.client("s3") | ||
key = _format_key(**event["pathParameters"]) | ||
types = _get_column_types(s3_client, s3_bucket_name, **event["pathParameters"]) | ||
presign_url = s3_client.generate_presigned_url( | ||
"get_object", | ||
Params={ | ||
"Bucket": s3_bucket_name, | ||
"Key": key, | ||
"ResponseContentType": "text/csv", | ||
}, | ||
ExpiresIn=600, | ||
) | ||
res = { | ||
"Headers": { | ||
"Location": presign_url, | ||
"x-column-names": ",".join(key for key in types.keys()), | ||
"x-column-types": ",".join(key for key in types.values()), | ||
# TODO: add x-column-descriptions once a source for column descriptions | ||
# has been established | ||
} | ||
} | ||
res = functions.http_response(302, res) | ||
return res | ||
|
||
|
||
@decorators.generic_error_handler(msg="Error retrieving chart data") | ||
def get_csv_list_handler(event, context): | ||
"""manages event from dashboard api call and creates a temporary URL""" | ||
del context | ||
s3_bucket_name = os.environ.get("BUCKET_NAME") | ||
s3_client = boto3.client("s3") | ||
|
||
types_metadata = functions.read_metadata( | ||
s3_client, | ||
s3_bucket_name, | ||
meta_type=enums.JsonFilename.COLUMN_TYPES.value, | ||
) | ||
urls = [] | ||
for study in types_metadata: | ||
for subscription in types_metadata[study]: | ||
for version in types_metadata[study][subscription]: | ||
urls.append( | ||
"/".join( | ||
( | ||
study, | ||
subscription, | ||
version, | ||
types_metadata[study]["filename"], | ||
) | ||
) | ||
) | ||
|
||
res = functions.http_response(200, urls, allow_cors=True) | ||
return res |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.