From 75895644a4b8a9fdff14782533cb0e46e813ee86 Mon Sep 17 00:00:00 2001 From: Matt Garber Date: Fri, 20 Oct 2023 16:08:10 -0400 Subject: [PATCH] copy parquet to csv in last_valid --- src/handlers/site_upload/powerset_merge.py | 25 ++++++++++++++++++++++ tests/site_upload/test_powerset_merge.py | 19 ++++++++++------ 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/src/handlers/site_upload/powerset_merge.py b/src/handlers/site_upload/powerset_merge.py index 4cfc25c..b5273d0 100644 --- a/src/handlers/site_upload/powerset_merge.py +++ b/src/handlers/site_upload/powerset_merge.py @@ -258,6 +258,31 @@ def merge_powersets(manager: S3Manager) -> None: f"{BucketPath.LATEST.value}/{subbucket_path}", f"{BucketPath.LAST_VALID.value}/{subbucket_path}", ) + #################### + # For now, we'll create a csv of the file we just put in last valid. + # This is occasionally useful for uploading to the dashboard. + # TODO: remove as soon as we support either parquet upload or + # the API is supported by the dashboard + last_valid_df = awswrangler.s3.read_parquet( + f"s3://{manager.s3_bucket_name}/{BucketPath.LAST_VALID.value}" + f"/{subbucket_path}" + ) + last_valid_df = last_valid_df.apply( + lambda x: x.strip() if isinstance(x, str) else x + ).replace('""', nan) + last_valid_df = last_valid_df.replace(to_replace=r",", value="", regex=True) + awswrangler.s3.to_csv( + last_valid_df, + ( + f"s3://{manager.s3_bucket_name}/{BucketPath.LAST_VALID.value}/" + f"{subbucket_path}".replace(".parquet", ".csv") + ), + index=False, + quoting=csv.QUOTE_NONE, + ) + + #################### + latest_site = site_specific_name.split("/", maxsplit=1)[0] manager.update_local_metadata( TransactionKeys.LAST_DATA_UPDATE.value, site=latest_site diff --git a/tests/site_upload/test_powerset_merge.py b/tests/site_upload/test_powerset_merge.py index 74ee1da..7dbe8cf 100644 --- a/tests/site_upload/test_powerset_merge.py +++ b/tests/site_upload/test_powerset_merge.py @@ -47,7 +47,7 @@ f"{EXISTING_VERSION}/encounter.parquet", False, 200, - ITEM_COUNT + 3, + ITEM_COUNT + 4, ), ( # Adding a new data package to a site without uploads "./tests/test_data/count_synthea_patient.parquet", @@ -57,7 +57,7 @@ f"/{EXISTING_VERSION}/encounter.parquet", False, 200, - ITEM_COUNT + 3, + ITEM_COUNT + 4, ), ( # Updating an existing data package "./tests/test_data/count_synthea_patient.parquet", @@ -67,7 +67,7 @@ f"/{EXISTING_VERSION}/encounter.parquet", True, 200, - ITEM_COUNT + 2, + ITEM_COUNT + 3, ), ( # New version of existing data package "./tests/test_data/count_synthea_patient.parquet", @@ -77,7 +77,7 @@ f"/{NEW_VERSION}/encounter.parquet", True, 200, - ITEM_COUNT + 4, + ITEM_COUNT + 5, ), ( # Invalid parquet file "./tests/site_upload/test_powerset_merge.py", @@ -97,7 +97,7 @@ f"/{EXISTING_VERSION}/encounter.parquet", False, 200, - ITEM_COUNT + 3, + ITEM_COUNT + 4, ), ( # ensuring that a data package that is a substring does not get # merged by substr match @@ -108,7 +108,7 @@ f"{EXISTING_SITE}/{EXISTING_VERSION}/encount.parquet", False, 200, - ITEM_COUNT + 3, + ITEM_COUNT + 4, ), ( # Empty file upload None, @@ -222,7 +222,12 @@ def test_powerset_merge_single_upload( ) elif item["Key"].startswith(BucketPath.LAST_VALID.value): - assert item["Key"] == (f"{BucketPath.LAST_VALID.value}{upload_path}") + if item["Key"].endswith(".parquet"): + assert item["Key"] == (f"{BucketPath.LAST_VALID.value}{upload_path}") + elif item["Key"].endswith(".csv"): + assert f"{upload_path.replace('.parquet','')}" in item["Key"] + else: + raise Exception("Invalid csv found at " f"{item['Key']}") else: assert ( item["Key"].startswith(BucketPath.ARCHIVE.value)