Skip to content

Commit

Permalink
copy parquet to csv in last_valid
Browse files Browse the repository at this point in the history
  • Loading branch information
dogversioning committed Oct 20, 2023
1 parent 0eb3a1d commit 7589564
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
25 changes: 25 additions & 0 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,31 @@ def merge_powersets(manager: S3Manager) -> None:
f"{BucketPath.LATEST.value}/{subbucket_path}",
f"{BucketPath.LAST_VALID.value}/{subbucket_path}",
)
####################
# For now, we'll create a csv of the file we just put in last valid.
# This is occasionally useful for uploading to the dashboard.
# TODO: remove as soon as we support either parquet upload or
# the API is supported by the dashboard
last_valid_df = awswrangler.s3.read_parquet(
f"s3://{manager.s3_bucket_name}/{BucketPath.LAST_VALID.value}"
f"/{subbucket_path}"
)
last_valid_df = last_valid_df.apply(
lambda x: x.strip() if isinstance(x, str) else x
).replace('""', nan)
last_valid_df = last_valid_df.replace(to_replace=r",", value="", regex=True)
awswrangler.s3.to_csv(
last_valid_df,
(
f"s3://{manager.s3_bucket_name}/{BucketPath.LAST_VALID.value}/"
f"{subbucket_path}".replace(".parquet", ".csv")
),
index=False,
quoting=csv.QUOTE_NONE,
)

####################

latest_site = site_specific_name.split("/", maxsplit=1)[0]
manager.update_local_metadata(
TransactionKeys.LAST_DATA_UPDATE.value, site=latest_site
Expand Down
19 changes: 12 additions & 7 deletions tests/site_upload/test_powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
f"{EXISTING_VERSION}/encounter.parquet",
False,
200,
ITEM_COUNT + 3,
ITEM_COUNT + 4,
),
( # Adding a new data package to a site without uploads
"./tests/test_data/count_synthea_patient.parquet",
Expand All @@ -57,7 +57,7 @@
f"/{EXISTING_VERSION}/encounter.parquet",
False,
200,
ITEM_COUNT + 3,
ITEM_COUNT + 4,
),
( # Updating an existing data package
"./tests/test_data/count_synthea_patient.parquet",
Expand All @@ -67,7 +67,7 @@
f"/{EXISTING_VERSION}/encounter.parquet",
True,
200,
ITEM_COUNT + 2,
ITEM_COUNT + 3,
),
( # New version of existing data package
"./tests/test_data/count_synthea_patient.parquet",
Expand All @@ -77,7 +77,7 @@
f"/{NEW_VERSION}/encounter.parquet",
True,
200,
ITEM_COUNT + 4,
ITEM_COUNT + 5,
),
( # Invalid parquet file
"./tests/site_upload/test_powerset_merge.py",
Expand All @@ -97,7 +97,7 @@
f"/{EXISTING_VERSION}/encounter.parquet",
False,
200,
ITEM_COUNT + 3,
ITEM_COUNT + 4,
),
( # ensuring that a data package that is a substring does not get
# merged by substr match
Expand All @@ -108,7 +108,7 @@
f"{EXISTING_SITE}/{EXISTING_VERSION}/encount.parquet",
False,
200,
ITEM_COUNT + 3,
ITEM_COUNT + 4,
),
( # Empty file upload
None,
Expand Down Expand Up @@ -222,7 +222,12 @@ def test_powerset_merge_single_upload(
)

elif item["Key"].startswith(BucketPath.LAST_VALID.value):
assert item["Key"] == (f"{BucketPath.LAST_VALID.value}{upload_path}")
if item["Key"].endswith(".parquet"):
assert item["Key"] == (f"{BucketPath.LAST_VALID.value}{upload_path}")
elif item["Key"].endswith(".csv"):
assert f"{upload_path.replace('.parquet','')}" in item["Key"]
else:
raise Exception("Invalid csv found at " f"{item['Key']}")
else:
assert (
item["Key"].startswith(BucketPath.ARCHIVE.value)
Expand Down

0 comments on commit 7589564

Please sign in to comment.