Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration for new data package format #122

Merged
merged 3 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions scripts/migrations/migration.003.data_packages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import argparse
import enum
import io
import json
import os

import awswrangler
import boto3
import pandas
from rich import progress


class JsonFilename(enum.Enum):
"""stores names of expected kinds of persisted S3 JSON files"""

COLUMN_TYPES = "column_types"
TRANSACTIONS = "transactions"
DATA_PACKAGES = "data_packages"
STUDY_PERIODS = "study_periods"


class BucketPath(enum.Enum):
"""stores root level buckets for managing data processing state"""

ADMIN = "admin"
AGGREGATE = "aggregates"
ARCHIVE = "archive"
CACHE = "cache"
CSVAGGREGATE = "csv_aggregates"
ERROR = "error"
LAST_VALID = "last_valid"
LATEST = "latest"
META = "metadata"
STUDY_META = "study_metadata"
UPLOAD = "site_upload"


def get_csv_column_datatypes(dtypes):
"""helper for generating column type for dashboard API"""
column_dict = {}
for column in dtypes.index:
if column.endswith("year"):
column_dict[column] = "year"
elif column.endswith("month"):
column_dict[column] = "month"
elif column.endswith("week"):
column_dict[column] = "week"
elif column.endswith("day") or str(dtypes[column]) == "datetime64":
column_dict[column] = "day"
elif "cnt" in column or str(dtypes[column]) in (
"Int8",
"Int16",
"Int32",
"Int64",
"UInt8",
"UInt16",
"UInt32",
"UInt64",
):
column_dict[column] = "integer"
elif str(dtypes[column]) in ("Float32", "Float64"):
column_dict[column] = "float"
elif str(dtypes[column]) == "boolean":
column_dict[column] = "float"
else:
column_dict[column] = "string"
return column_dict


def _put_s3_data(key: str, bucket_name: str, client, data: dict) -> None:
"""Convenience class for writing a dict to S3"""
b_data = io.BytesIO(json.dumps(data).encode())
client.upload_fileobj(Bucket=bucket_name, Key=key, Fileobj=b_data)


def update_column_type_metadata(bucket: str):
"""creates a new metadata dict for column types.

By design, this will replaces an existing column type dict if one already exists.
"""
client = boto3.client("s3")
res = client.list_objects_v2(Bucket=bucket, Prefix="aggregates/")
contents = res["Contents"]
output = {}
for resource in progress.track(contents):
dirs = resource["Key"].split("/")
study = dirs[1]
subscription = dirs[2].split("__")[1]
version = dirs[3]
bytes_buffer = io.BytesIO()
client.download_fileobj(Bucket=bucket, Key=resource["Key"], Fileobj=bytes_buffer)
df = pandas.read_parquet(bytes_buffer)
type_dict = get_csv_column_datatypes(df.dtypes)
output.setdefault(study, {})
output[study].setdefault(subscription, {})
output[study][subscription].setdefault(version, {})
output[study][subscription][version]["column_types_format_version"] = 2
output[study][subscription][version]["columns"] = type_dict
output[study][subscription][version]["last_data_update"] = (
resource["LastModified"].now().isoformat()
)
output[study][subscription][version]["s3_path"] = resource["Key"][:-8] + ".csv"
output[study][subscription][version]["total"] = int(df["cnt"][0])
_put_s3_data("metadata/column_types.json", bucket, client, output)


def get_s3_json_as_dict(bucket, key: str):
"""reads a json object as dict (typically metadata in this case)"""
s3_client = boto3.client("s3")
bytes_buffer = io.BytesIO()
print(bucket)
print(key)
s3_client.download_fileobj(
Bucket=bucket,
Key=key,
Fileobj=bytes_buffer,
)
return json.loads(bytes_buffer.getvalue().decode())


def cache_api_data(s3_bucket_name: str, db: str) -> None:
s3_client = boto3.client("s3")
df = awswrangler.athena.read_sql_query(
(
f"SELECT table_name FROM information_schema.tables " # noqa: S608
f"WHERE table_schema = '{db}'" # nosec
),
database=db,
s3_output=f"s3://{s3_bucket_name}/awswrangler",
workgroup=os.environ.get("WORKGROUP_NAME"),
)
data_packages = df[df["table_name"].str.contains("__")].iloc[:, 0]
column_types = get_s3_json_as_dict(
s3_bucket_name,
f"{BucketPath.META.value}/{JsonFilename.COLUMN_TYPES.value}.json",
)
dp_details = []
for dp in list(data_packages):
dp_detail = {
"study": dp.split("__", 1)[0],
"name": dp.split("__", 1)[1],
}
try:
versions = column_types[dp_detail["study"]][dp_detail["name"]]
for version in versions:
dp_details.append(
{
**dp_detail,
**versions[version],
"version": version,
"id": dp + "__" + version,
}
)
except KeyError:
continue
s3_client.put_object(
Bucket=s3_bucket_name,
Key=f"{BucketPath.CACHE.value}/{JsonFilename.DATA_PACKAGES.value}.json",
Body=json.dumps(dp_details),
)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""Creates data package metadata for existing aggregates. """
)
parser.add_argument("-b", "--bucket", help="bucket name")
parser.add_argument("-d", "--db", help="database name")
args = parser.parse_args()
update_column_type_metadata(args.bucket)
cache_api_data(args.bucket, args.db)
35 changes: 1 addition & 34 deletions src/handlers/shared/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from datetime import UTC, datetime

import boto3
import pandas

from src.handlers.shared import enums

Expand All @@ -27,7 +26,7 @@
}

COLUMN_TYPES_METADATA_TEMPLATE = {
enums.ColumnTypesKeys.COLUMN_TYPES_FORMAT_VERSION.value: "1",
enums.ColumnTypesKeys.COLUMN_TYPES_FORMAT_VERSION.value: "2",
Comment on lines -30 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain to a n00b what implications this has?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At :some: point - it might be useful for a downstream consumer to realize the format has changed. Right now, it's primarily documentation.

This version change does mean that insight needs to be updated to expect a different kind of data back.

enums.ColumnTypesKeys.COLUMNS.value: None,
enums.ColumnTypesKeys.LAST_DATA_UPDATE.value: None,
}
Expand Down Expand Up @@ -221,35 +220,3 @@ def get_latest_data_package_version(bucket, prefix):
if highest_ver is None:
logging.error("No data package versions found for %s", prefix)
return highest_ver


def get_column_datatypes(dtypes: pandas.DataFrame):
"""helper for generating column type for dashboard API"""
column_dict = {}
for column in dtypes.index:
if column.endswith("year"):
column_dict[column] = "year"
elif column.endswith("month"):
column_dict[column] = "month"
elif column.endswith("week"):
column_dict[column] = "week"
elif column.endswith("day") or str(dtypes[column]) == "datetime64":
column_dict[column] = "day"
elif column.startswith("cnt") or str(dtypes[column]) in (
"Int8",
"Int16",
"Int32",
"Int64",
"UInt8",
"UInt16",
"UInt32",
"UInt64",
):
column_dict[column] = "integer"
elif str(dtypes[column]) in ("Float32", "Float64"):
column_dict[column] = "float"
elif str(dtypes[column]) == "boolean":
column_dict[column] = "boolean"
else:
column_dict[column] = "string"
return column_dict
35 changes: 35 additions & 0 deletions src/handlers/shared/pandas_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Pandas functions used across different functions"""

import pandas


def get_column_datatypes(dtypes: pandas.DataFrame):
"""helper for generating column type for dashboard API"""
column_dict = {}
for column in dtypes.index:
if column.endswith("year"):
column_dict[column] = "year"
elif column.endswith("month"):
column_dict[column] = "month"
elif column.endswith("week"):
column_dict[column] = "week"
elif column.endswith("day") or str(dtypes[column]) == "datetime64":
column_dict[column] = "day"
elif column.startswith("cnt") or str(dtypes[column]) in (
"Int8",
"Int16",
"Int32",
"Int64",
"UInt8",
"UInt16",
"UInt32",
"UInt64",
):
column_dict[column] = "integer"
elif str(dtypes[column]) in ("Float32", "Float64"):
column_dict[column] = "float"
elif str(dtypes[column]) == "boolean":
column_dict[column] = "boolean"
else:
column_dict[column] = "string"
return column_dict
10 changes: 8 additions & 2 deletions src/handlers/site_upload/cache_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,20 @@ def cache_api_data(s3_client, s3_bucket_name: str, db: str, target: str) -> None
dp_details = []
for dp in list(data_packages):
dp_detail = {
"id": dp,
"study": dp.split("__", 1)[0],
"name": dp.split("__", 1)[1],
}
try:
versions = column_types[dp_detail["study"]][dp_detail["name"]]
for version in versions:
dp_details.append({**dp_detail, **versions[version], "version": version})
dp_details.append(
{
**dp_detail,
**versions[version],
"version": version,
"id": dp + "__" + version,
}
)
except KeyError:
continue
s3_client.put_object(
Expand Down
24 changes: 15 additions & 9 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
import pandas
from pandas.core.indexes.range import RangeIndex

from src.handlers.shared import awswrangler_functions, decorators, enums, functions
from src.handlers.shared import (
awswrangler_functions,
decorators,
enums,
functions,
pandas_functions,
)

log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO")
logger = logging.getLogger()
Expand Down Expand Up @@ -45,6 +51,11 @@ def __init__(self, event):
self.s3_bucket_name,
meta_type=enums.JsonFilename.COLUMN_TYPES.value,
)
self.csv_aggerate_path = (
f"s3://{self.s3_bucket_name}/{enums.BucketPath.CSVAGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/{self.version}/"
f"{self.study}__{self.data_package}__aggregate.csv"
)

# S3 Filesystem operations
def get_data_package_list(self, path) -> list:
Expand Down Expand Up @@ -86,14 +97,9 @@ def write_parquet(self, df: pandas.DataFrame, is_new_data_package: bool) -> None

def write_csv(self, df: pandas.DataFrame) -> None:
"""writes dataframe as csv to s3"""
csv_aggregate_path = (
f"s3://{self.s3_bucket_name}/{enums.BucketPath.CSVAGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/{self.version}/"
f"{self.study}__{self.data_package}__aggregate.csv"
)
df = df.apply(lambda x: x.strip() if isinstance(x, str) else x).replace('""', numpy.nan)
df = df.replace(to_replace=r",", value="", regex=True)
awswrangler.s3.to_csv(df, csv_aggregate_path, index=False, quoting=csv.QUOTE_NONE)
awswrangler.s3.to_csv(df, self.csv_aggerate_path, index=False, quoting=csv.QUOTE_NONE)

# metadata
def update_local_metadata(
Expand Down Expand Up @@ -339,13 +345,13 @@ def merge_powersets(manager: S3Manager) -> None:
manager.write_local_metadata()

# Updating the typing dict for the column type API
column_dict = functions.get_column_datatypes(df.dtypes)
column_dict = pandas_functions.get_column_datatypes(df.dtypes)
manager.update_local_metadata(
enums.ColumnTypesKeys.COLUMNS.value,
value=column_dict,
metadata=manager.types_metadata,
meta_type=enums.JsonFilename.COLUMN_TYPES.value,
extra_items={"total": int(df["cnt"][0])},
extra_items={"total": int(df["cnt"][0]), "s3_path": manager.csv_aggerate_path},
)
manager.update_local_metadata(
enums.ColumnTypesKeys.LAST_DATA_UPDATE.value,
Expand Down
8 changes: 4 additions & 4 deletions template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Resources:
LoggingConfig:
ApplicationLogLevel: !Ref LogLevel
LogFormat: !Ref LogFormat
LogGroup: !Sub "/aws/lambda/CumulusAggFetchAuthorizerFunction-${DeployStage}"
LogGroup: !Sub "/aws/lambda/CumulusAggFetchAuthorizer-${DeployStage}"
MemorySize: 128
Timeout: 100
Description: Validates credentials before providing signed urls
Expand Down Expand Up @@ -115,7 +115,7 @@ Resources:
LoggingConfig:
ApplicationLogLevel: !Ref LogLevel
LogFormat: !Ref LogFormat
LogGroup: !Sub "/aws/lambda/CumulusAggFetchUploadUrlFunction-${DeployStage}"
LogGroup: !Sub "/aws/lambda/CumulusAggFetchUploadUrl-${DeployStage}"
MemorySize: 128
Timeout: 100
Description: Generates a presigned URL for uploading files to S3
Expand Down Expand Up @@ -156,7 +156,7 @@ Resources:
LoggingConfig:
ApplicationLogLevel: !Ref LogLevel
LogFormat: !Ref LogFormat
LogGroup: !Sub "/aws/lambda/CumulusAggProcessUploadFunction-${DeployStage}"
LogGroup: !Sub "/aws/lambda/CumulusAggProcessUpload-${DeployStage}"
MemorySize: 128
Timeout: 800
Description: Handles initial relocation of upload data
Expand Down Expand Up @@ -196,7 +196,7 @@ Resources:
LoggingConfig:
ApplicationLogLevel: !Ref LogLevel
LogFormat: !Ref LogFormat
LogGroup: !Sub "/aws/lambda/CumulusAggPowersetMergeFunction-${DeployStage}"
LogGroup: !Sub "/aws/lambda/CumulusAggPowersetMerge-${DeployStage}"
MemorySize: 8192
Timeout: 800
Description: Merges and aggregates powerset count data
Expand Down
4 changes: 2 additions & 2 deletions tests/shared/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas
import pytest

from src.handlers.shared import functions
from src.handlers.shared import functions, pandas_functions


@pytest.mark.parametrize(
Expand Down Expand Up @@ -48,7 +48,7 @@ def test_column_datatypes():
"string": ["string"],
}
)
col_types = functions.get_column_datatypes(df.dtypes)
col_types = pandas_functions.get_column_datatypes(df.dtypes)
assert col_types == {
"study_year": "year",
"study_month": "month",
Expand Down
Loading