From 301807136ad9f1001080c855455a12d956c728b8 Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Fri, 4 Oct 2024 14:58:38 -0400 Subject: [PATCH 1/3] Migration for new data package format --- .../migrations/migration.003.data_packages.py | 171 ++++++++++++++++++ src/handlers/shared/functions.py | 2 +- src/handlers/site_upload/cache_api.py | 10 +- src/handlers/site_upload/powerset_merge.py | 14 +- template.yaml | 10 +- 5 files changed, 193 insertions(+), 14 deletions(-) create mode 100644 scripts/migrations/migration.003.data_packages.py diff --git a/scripts/migrations/migration.003.data_packages.py b/scripts/migrations/migration.003.data_packages.py new file mode 100644 index 0000000..8ea6efd --- /dev/null +++ b/scripts/migrations/migration.003.data_packages.py @@ -0,0 +1,171 @@ +import argparse +import enum +import io +import json +import os + +import awswrangler +import boto3 +import pandas +from rich import progress + + +class JsonFilename(enum.Enum): + """stores names of expected kinds of persisted S3 JSON files""" + + COLUMN_TYPES = "column_types" + TRANSACTIONS = "transactions" + DATA_PACKAGES = "data_packages" + STUDY_PERIODS = "study_periods" + + +class BucketPath(enum.Enum): + """stores root level buckets for managing data processing state""" + + ADMIN = "admin" + AGGREGATE = "aggregates" + ARCHIVE = "archive" + CACHE = "cache" + CSVAGGREGATE = "csv_aggregates" + ERROR = "error" + LAST_VALID = "last_valid" + LATEST = "latest" + META = "metadata" + STUDY_META = "study_metadata" + UPLOAD = "site_upload" + + +def get_csv_column_datatypes(dtypes): + """helper for generating column type for dashboard API""" + column_dict = {} + for column in dtypes.index: + if column.endswith("year"): + column_dict[column] = "year" + elif column.endswith("month"): + column_dict[column] = "month" + elif column.endswith("week"): + column_dict[column] = "week" + elif column.endswith("day") or str(dtypes[column]) == "datetime64": + column_dict[column] = "day" + elif "cnt" in column or str(dtypes[column]) in ( + "Int8", + "Int16", + "Int32", + "Int64", + "UInt8", + "UInt16", + "UInt32", + "UInt64", + ): + column_dict[column] = "integer" + elif str(dtypes[column]) in ("Float32", "Float64"): + column_dict[column] = "float" + elif str(dtypes[column]) == "boolean": + column_dict[column] = "float" + else: + column_dict[column] = "string" + return column_dict + + +def _put_s3_data(key: str, bucket_name: str, client, data: dict) -> None: + """Convenience class for writing a dict to S3""" + b_data = io.BytesIO(json.dumps(data).encode()) + client.upload_fileobj(Bucket=bucket_name, Key=key, Fileobj=b_data) + + +def update_column_type_metadata(bucket: str): + """creates a new metadata dict for column types. + + By design, this will replaces an existing column type dict if one already exists. + """ + client = boto3.client("s3") + res = client.list_objects_v2(Bucket=bucket, Prefix="aggregates/") + contents = res["Contents"] + output = {} + for resource in progress.track(contents): + dirs = resource["Key"].split("/") + study = dirs[1] + subscription = dirs[2].split("__")[1] + version = dirs[3] + bytes_buffer = io.BytesIO() + client.download_fileobj(Bucket=bucket, Key=resource["Key"], Fileobj=bytes_buffer) + df = pandas.read_parquet(bytes_buffer) + type_dict = get_csv_column_datatypes(df.dtypes) + output.setdefault(study, {}) + output[study].setdefault(subscription, {}) + output[study][subscription].setdefault(version, {}) + output[study][subscription][version]["column_types_format_version"] = 2 + output[study][subscription][version]["columns"] = type_dict + output[study][subscription][version]["last_data_update"] = ( + resource["LastModified"].now().isoformat() + ) + output[study][subscription][version]["s3_path"] = resource["Key"][:-8] + ".csv" + output[study][subscription][version]["total"] = int(df["cnt"][0]) + _put_s3_data("metadata/column_types.json", bucket, client, output) + + +def get_s3_json_as_dict(bucket, key: str): + """reads a json object as dict (typically metadata in this case)""" + s3_client = boto3.client("s3") + bytes_buffer = io.BytesIO() + print(bucket) + print(key) + s3_client.download_fileobj( + Bucket=bucket, + Key=key, + Fileobj=bytes_buffer, + ) + return json.loads(bytes_buffer.getvalue().decode()) + + +def cache_api_data(s3_bucket_name: str, db: str) -> None: + s3_client = boto3.client("s3") + df = awswrangler.athena.read_sql_query( + ( + f"SELECT table_name FROM information_schema.tables " # noqa: S608 + f"WHERE table_schema = '{db}'" # nosec + ), + database=db, + s3_output=f"s3://{s3_bucket_name}/awswrangler", + workgroup=os.environ.get("WORKGROUP_NAME"), + ) + data_packages = df[df["table_name"].str.contains("__")].iloc[:, 0] + column_types = get_s3_json_as_dict( + s3_bucket_name, + f"{BucketPath.META.value}/{JsonFilename.COLUMN_TYPES.value}.json", + ) + dp_details = [] + for dp in list(data_packages): + dp_detail = { + "study": dp.split("__", 1)[0], + "name": dp.split("__", 1)[1], + } + try: + versions = column_types[dp_detail["study"]][dp_detail["name"]] + for version in versions: + dp_details.append( + { + **dp_detail, + **versions[version], + "version": version, + "id": dp + "__" + version, + } + ) + except KeyError: + continue + s3_client.put_object( + Bucket=s3_bucket_name, + Key=f"{BucketPath.CACHE.value}/{JsonFilename.DATA_PACKAGES.value}.json", + Body=json.dumps(dp_details), + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="""Creates data package metadata for existing aggregates. """ + ) + parser.add_argument("-b", "--bucket", help="bucket name") + parser.add_argument("-d", "--db", help="database name") + args = parser.parse_args() + update_column_type_metadata(args.bucket) + cache_api_data(args.bucket, args.db) diff --git a/src/handlers/shared/functions.py b/src/handlers/shared/functions.py index b17bd38..96e76a3 100644 --- a/src/handlers/shared/functions.py +++ b/src/handlers/shared/functions.py @@ -27,7 +27,7 @@ } COLUMN_TYPES_METADATA_TEMPLATE = { - enums.ColumnTypesKeys.COLUMN_TYPES_FORMAT_VERSION.value: "1", + enums.ColumnTypesKeys.COLUMN_TYPES_FORMAT_VERSION.value: "2", enums.ColumnTypesKeys.COLUMNS.value: None, enums.ColumnTypesKeys.LAST_DATA_UPDATE.value: None, } diff --git a/src/handlers/site_upload/cache_api.py b/src/handlers/site_upload/cache_api.py index f3e1e68..641e04c 100644 --- a/src/handlers/site_upload/cache_api.py +++ b/src/handlers/site_upload/cache_api.py @@ -31,14 +31,20 @@ def cache_api_data(s3_client, s3_bucket_name: str, db: str, target: str) -> None dp_details = [] for dp in list(data_packages): dp_detail = { - "id": dp, "study": dp.split("__", 1)[0], "name": dp.split("__", 1)[1], } try: versions = column_types[dp_detail["study"]][dp_detail["name"]] for version in versions: - dp_details.append({**dp_detail, **versions[version], "version": version}) + dp_details.append( + { + **dp_detail, + **versions[version], + "version": version, + "id": dp + "__" + version, + } + ) except KeyError: continue s3_client.put_object( diff --git a/src/handlers/site_upload/powerset_merge.py b/src/handlers/site_upload/powerset_merge.py index d08fdc5..d1d05e3 100644 --- a/src/handlers/site_upload/powerset_merge.py +++ b/src/handlers/site_upload/powerset_merge.py @@ -45,6 +45,11 @@ def __init__(self, event): self.s3_bucket_name, meta_type=enums.JsonFilename.COLUMN_TYPES.value, ) + self.csv_aggerate_path = ( + f"s3://{self.s3_bucket_name}/{enums.BucketPath.CSVAGGREGATE.value}/" + f"{self.study}/{self.study}__{self.data_package}/{self.version}/" + f"{self.study}__{self.data_package}__aggregate.csv" + ) # S3 Filesystem operations def get_data_package_list(self, path) -> list: @@ -86,14 +91,9 @@ def write_parquet(self, df: pandas.DataFrame, is_new_data_package: bool) -> None def write_csv(self, df: pandas.DataFrame) -> None: """writes dataframe as csv to s3""" - csv_aggregate_path = ( - f"s3://{self.s3_bucket_name}/{enums.BucketPath.CSVAGGREGATE.value}/" - f"{self.study}/{self.study}__{self.data_package}/{self.version}/" - f"{self.study}__{self.data_package}__aggregate.csv" - ) df = df.apply(lambda x: x.strip() if isinstance(x, str) else x).replace('""', numpy.nan) df = df.replace(to_replace=r",", value="", regex=True) - awswrangler.s3.to_csv(df, csv_aggregate_path, index=False, quoting=csv.QUOTE_NONE) + awswrangler.s3.to_csv(df, self.csv_aggerate_path, index=False, quoting=csv.QUOTE_NONE) # metadata def update_local_metadata( @@ -345,7 +345,7 @@ def merge_powersets(manager: S3Manager) -> None: value=column_dict, metadata=manager.types_metadata, meta_type=enums.JsonFilename.COLUMN_TYPES.value, - extra_items={"total": int(df["cnt"][0])}, + extra_items={"total": int(df["cnt"][0]), "s3_path": manager.csv_aggerate_path}, ) manager.update_local_metadata( enums.ColumnTypesKeys.LAST_DATA_UPDATE.value, diff --git a/template.yaml b/template.yaml index 11a1753..0317657 100644 --- a/template.yaml +++ b/template.yaml @@ -77,11 +77,12 @@ Resources: Properties: FunctionName: !Sub 'CumulusAggFetchAuthorizer-${DeployStage}' Handler: src/handlers/site_upload/api_gateway_authorizer.lambda_handler + Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel LogFormat: !Ref LogFormat - LogGroup: !Sub "/aws/lambda/CumulusAggFetchAuthorizerFunction-${DeployStage}" + LogGroup: !Sub "/aws/lambda/CumulusAggFetchAuthorizer-${DeployStage}" MemorySize: 128 Timeout: 100 Description: Validates credentials before providing signed urls @@ -110,12 +111,13 @@ Resources: Type: AWS::Serverless::Function Properties: FunctionName: !Sub 'CumulusAggFetchUploadUrl-${DeployStage}' + Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Handler: src/handlers/site_upload/fetch_upload_url.upload_url_handler Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel LogFormat: !Ref LogFormat - LogGroup: !Sub "/aws/lambda/CumulusAggFetchUploadUrlFunction-${DeployStage}" + LogGroup: !Sub "/aws/lambda/CumulusAggFetchUploadUrl-${DeployStage}" MemorySize: 128 Timeout: 100 Description: Generates a presigned URL for uploading files to S3 @@ -156,7 +158,7 @@ Resources: LoggingConfig: ApplicationLogLevel: !Ref LogLevel LogFormat: !Ref LogFormat - LogGroup: !Sub "/aws/lambda/CumulusAggProcessUploadFunction-${DeployStage}" + LogGroup: !Sub "/aws/lambda/CumulusAggProcessUpload-${DeployStage}" MemorySize: 128 Timeout: 800 Description: Handles initial relocation of upload data @@ -196,7 +198,7 @@ Resources: LoggingConfig: ApplicationLogLevel: !Ref LogLevel LogFormat: !Ref LogFormat - LogGroup: !Sub "/aws/lambda/CumulusAggPowersetMergeFunction-${DeployStage}" + LogGroup: !Sub "/aws/lambda/CumulusAggPowersetMerge-${DeployStage}" MemorySize: 8192 Timeout: 800 Description: Merges and aggregates powerset count data From 985847c16550ef07c9bb0d77ce3871b2b7319e52 Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Fri, 4 Oct 2024 15:30:19 -0400 Subject: [PATCH 2/3] more layers --- template.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/template.yaml b/template.yaml index 0317657..b902ecb 100644 --- a/template.yaml +++ b/template.yaml @@ -154,6 +154,7 @@ Resources: Properties: FunctionName: !Sub 'CumulusAggProcessUpload-${DeployStage}' Handler: src/handlers/site_upload/process_upload.process_upload_handler + Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel @@ -392,6 +393,7 @@ Resources: Properties: FunctionName: !Sub 'CumulusAggDashboardGetCsv-${DeployStage}' Handler: src/handlers/dashboard/get_csv.get_csv_handler + Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel @@ -438,6 +440,7 @@ Resources: Properties: FunctionName: !Sub 'CumulusAggDashboardGetCsvList-${DeployStage}' Handler: src/handlers/dashboard/get_csv.get_csv_list_handler + Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel @@ -485,6 +488,7 @@ Resources: Properties: FunctionName: !Sub 'CumulusAggDashboardGetMetadata-${DeployStage}' Handler: src/handlers/dashboard/get_metadata.metadata_handler + Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel @@ -617,6 +621,7 @@ Resources: Properties: FunctionName: !Sub 'CumulusAggDashboardStudyPeriods-${DeployStage}' Handler: src/handlers/dashboard/get_study_periods.study_periods_handler + Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel From 31017da0444eef6096bbc9a109174c28962b1d6e Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Mon, 7 Oct 2024 09:57:36 -0400 Subject: [PATCH 3/3] moved pandas function to seperate file --- src/handlers/shared/functions.py | 33 -------------------- src/handlers/shared/pandas_functions.py | 35 ++++++++++++++++++++++ src/handlers/site_upload/powerset_merge.py | 10 +++++-- template.yaml | 7 ----- tests/shared/test_functions.py | 4 +-- 5 files changed, 45 insertions(+), 44 deletions(-) create mode 100644 src/handlers/shared/pandas_functions.py diff --git a/src/handlers/shared/functions.py b/src/handlers/shared/functions.py index 96e76a3..3be0b7b 100644 --- a/src/handlers/shared/functions.py +++ b/src/handlers/shared/functions.py @@ -6,7 +6,6 @@ from datetime import UTC, datetime import boto3 -import pandas from src.handlers.shared import enums @@ -221,35 +220,3 @@ def get_latest_data_package_version(bucket, prefix): if highest_ver is None: logging.error("No data package versions found for %s", prefix) return highest_ver - - -def get_column_datatypes(dtypes: pandas.DataFrame): - """helper for generating column type for dashboard API""" - column_dict = {} - for column in dtypes.index: - if column.endswith("year"): - column_dict[column] = "year" - elif column.endswith("month"): - column_dict[column] = "month" - elif column.endswith("week"): - column_dict[column] = "week" - elif column.endswith("day") or str(dtypes[column]) == "datetime64": - column_dict[column] = "day" - elif column.startswith("cnt") or str(dtypes[column]) in ( - "Int8", - "Int16", - "Int32", - "Int64", - "UInt8", - "UInt16", - "UInt32", - "UInt64", - ): - column_dict[column] = "integer" - elif str(dtypes[column]) in ("Float32", "Float64"): - column_dict[column] = "float" - elif str(dtypes[column]) == "boolean": - column_dict[column] = "boolean" - else: - column_dict[column] = "string" - return column_dict diff --git a/src/handlers/shared/pandas_functions.py b/src/handlers/shared/pandas_functions.py new file mode 100644 index 0000000..2ac8569 --- /dev/null +++ b/src/handlers/shared/pandas_functions.py @@ -0,0 +1,35 @@ +"""Pandas functions used across different functions""" + +import pandas + + +def get_column_datatypes(dtypes: pandas.DataFrame): + """helper for generating column type for dashboard API""" + column_dict = {} + for column in dtypes.index: + if column.endswith("year"): + column_dict[column] = "year" + elif column.endswith("month"): + column_dict[column] = "month" + elif column.endswith("week"): + column_dict[column] = "week" + elif column.endswith("day") or str(dtypes[column]) == "datetime64": + column_dict[column] = "day" + elif column.startswith("cnt") or str(dtypes[column]) in ( + "Int8", + "Int16", + "Int32", + "Int64", + "UInt8", + "UInt16", + "UInt32", + "UInt64", + ): + column_dict[column] = "integer" + elif str(dtypes[column]) in ("Float32", "Float64"): + column_dict[column] = "float" + elif str(dtypes[column]) == "boolean": + column_dict[column] = "boolean" + else: + column_dict[column] = "string" + return column_dict diff --git a/src/handlers/site_upload/powerset_merge.py b/src/handlers/site_upload/powerset_merge.py index d1d05e3..7f33dd5 100644 --- a/src/handlers/site_upload/powerset_merge.py +++ b/src/handlers/site_upload/powerset_merge.py @@ -12,7 +12,13 @@ import pandas from pandas.core.indexes.range import RangeIndex -from src.handlers.shared import awswrangler_functions, decorators, enums, functions +from src.handlers.shared import ( + awswrangler_functions, + decorators, + enums, + functions, + pandas_functions, +) log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO") logger = logging.getLogger() @@ -339,7 +345,7 @@ def merge_powersets(manager: S3Manager) -> None: manager.write_local_metadata() # Updating the typing dict for the column type API - column_dict = functions.get_column_datatypes(df.dtypes) + column_dict = pandas_functions.get_column_datatypes(df.dtypes) manager.update_local_metadata( enums.ColumnTypesKeys.COLUMNS.value, value=column_dict, diff --git a/template.yaml b/template.yaml index b902ecb..4004feb 100644 --- a/template.yaml +++ b/template.yaml @@ -77,7 +77,6 @@ Resources: Properties: FunctionName: !Sub 'CumulusAggFetchAuthorizer-${DeployStage}' Handler: src/handlers/site_upload/api_gateway_authorizer.lambda_handler - Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel @@ -111,7 +110,6 @@ Resources: Type: AWS::Serverless::Function Properties: FunctionName: !Sub 'CumulusAggFetchUploadUrl-${DeployStage}' - Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Handler: src/handlers/site_upload/fetch_upload_url.upload_url_handler Runtime: "python3.11" LoggingConfig: @@ -154,7 +152,6 @@ Resources: Properties: FunctionName: !Sub 'CumulusAggProcessUpload-${DeployStage}' Handler: src/handlers/site_upload/process_upload.process_upload_handler - Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel @@ -393,7 +390,6 @@ Resources: Properties: FunctionName: !Sub 'CumulusAggDashboardGetCsv-${DeployStage}' Handler: src/handlers/dashboard/get_csv.get_csv_handler - Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel @@ -440,7 +436,6 @@ Resources: Properties: FunctionName: !Sub 'CumulusAggDashboardGetCsvList-${DeployStage}' Handler: src/handlers/dashboard/get_csv.get_csv_list_handler - Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel @@ -488,7 +483,6 @@ Resources: Properties: FunctionName: !Sub 'CumulusAggDashboardGetMetadata-${DeployStage}' Handler: src/handlers/dashboard/get_metadata.metadata_handler - Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel @@ -621,7 +615,6 @@ Resources: Properties: FunctionName: !Sub 'CumulusAggDashboardStudyPeriods-${DeployStage}' Handler: src/handlers/dashboard/get_study_periods.study_periods_handler - Layers: [arn:aws:lambda:us-east-1:336392948345:layer:AWSSDKPandas-Python311:17] Runtime: "python3.11" LoggingConfig: ApplicationLogLevel: !Ref LogLevel diff --git a/tests/shared/test_functions.py b/tests/shared/test_functions.py index e106697..339a2bb 100644 --- a/tests/shared/test_functions.py +++ b/tests/shared/test_functions.py @@ -4,7 +4,7 @@ import pandas import pytest -from src.handlers.shared import functions +from src.handlers.shared import functions, pandas_functions @pytest.mark.parametrize( @@ -48,7 +48,7 @@ def test_column_datatypes(): "string": ["string"], } ) - col_types = functions.get_column_datatypes(df.dtypes) + col_types = pandas_functions.get_column_datatypes(df.dtypes) assert col_types == { "study_year": "year", "study_month": "month",