From 23e105540495465326e52f9fa54ff361a9560308 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 22 Aug 2024 15:24:35 -0400 Subject: [PATCH 01/14] implement --- claims_hosp/delphi_claims_hosp/patch.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 claims_hosp/delphi_claims_hosp/patch.py diff --git a/claims_hosp/delphi_claims_hosp/patch.py b/claims_hosp/delphi_claims_hosp/patch.py new file mode 100644 index 000000000..e69de29bb From 39c433e27b2db7de63683bfb3c6208a850d127bd Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 22 Aug 2024 15:24:48 -0400 Subject: [PATCH 02/14] implement --- claims_hosp/delphi_claims_hosp/patch.py | 71 +++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/claims_hosp/delphi_claims_hosp/patch.py b/claims_hosp/delphi_claims_hosp/patch.py index e69de29bb..6fd6e7390 100644 --- a/claims_hosp/delphi_claims_hosp/patch.py +++ b/claims_hosp/delphi_claims_hosp/patch.py @@ -0,0 +1,71 @@ +""" +This module is used for patching data in the delphi_doctor_visits package. + +To use this module, you need to specify the range of issue dates in params.json, like so: + +{ + "common": { + ... + }, + "validation": { + ... + }, + "patch": { + "patch_dir": "/covidcast-indicators/doctor_visits/AprilPatch", + "start_issue": "2024-04-20", + "end_issue": "2024-04-21" + } +} + +It will generate data for that range of issue dates, and store them in batch issue format: +[name-of-patch]/issue_[issue-date]/doctor-visits/actual_data_file.csv +""" + +from datetime import datetime, timedelta +from os import makedirs + +from delphi_utils import get_structured_logger, read_params + +from .run import run_module + + +def patch(): + """ + Run the doctor visits indicator for a range of issue dates. + + The range of issue dates is specified in params.json using the following keys: + - "patch": Only used for patching data + - "start_date": str, YYYY-MM-DD format, first issue date + - "end_date": str, YYYY-MM-DD format, last issue date + - "patch_dir": str, directory to write all issues output + """ + params = read_params() + logger = get_structured_logger("delphi_claims_hosp.patch", filename=params["common"]["log_filename"]) + + start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") + end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") + + logger.info(f"""Start patching {params["patch"]["patch_dir"]}""") + logger.info(f"""Start issue: {start_issue.strftime("%Y-%m-%d")}""") + logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""") + + makedirs(params["patch"]["patch_dir"], exist_ok=True) + + current_issue = start_issue + + while current_issue <= end_issue: + logger.info(f"""Running issue {current_issue.strftime("%Y-%m-%d")}""") + + params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d") + + current_issue_yyyymmdd = current_issue.strftime("%Y%m%d") + current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/doctor-visits""" + makedirs(f"{current_issue_dir}", exist_ok=True) + params["common"]["export_dir"] = f"""{current_issue_dir}""" + + run_module(params, logger) + current_issue += timedelta(days=1) + + +if __name__ == "__main__": + patch() From cbc7894bcf4d8657086545a5972fc6ad2fac5030 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 22 Aug 2024 15:57:46 -0400 Subject: [PATCH 03/14] implimentation done --- .../download_claims_ftp_files.py | 5 +++-- .../get_latest_claims_name.py | 4 ++-- claims_hosp/delphi_claims_hosp/patch.py | 11 +++++++---- claims_hosp/delphi_claims_hosp/run.py | 17 +++++++++++------ 4 files changed, 23 insertions(+), 14 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py b/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py index 2ce093488..5c9019035 100644 --- a/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py +++ b/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py @@ -53,9 +53,10 @@ def change_date_format(name): return name -def download(ftp_credentials, out_path, logger): +def download(ftp_credentials, out_path, logger, issue_date=None): """Pull the latest raw files.""" - current_time = datetime.datetime.now() + current_time = issue_date if issue_date else datetime.datetime.now() + seconds_in_day = 24 * 60 * 60 logger.info("starting download", time=current_time) diff --git a/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py b/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py index e417183c7..ab03cbd14 100644 --- a/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py +++ b/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py @@ -5,9 +5,9 @@ import datetime from pathlib import Path -def get_latest_filename(dir_path, logger): +def get_latest_filename(dir_path, logger, issue_date=None): """Get the latest filename from the list of downloaded raw files.""" - current_date = datetime.datetime.now() + current_date = issue_date if issue_date else datetime.datetime.now() files = list(Path(dir_path).glob("*")) latest_timestamp = datetime.datetime(1900, 1, 1) diff --git a/claims_hosp/delphi_claims_hosp/patch.py b/claims_hosp/delphi_claims_hosp/patch.py index 6fd6e7390..d3de10c33 100644 --- a/claims_hosp/delphi_claims_hosp/patch.py +++ b/claims_hosp/delphi_claims_hosp/patch.py @@ -45,16 +45,19 @@ def patch(): start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") - logger.info(f"""Start patching {params["patch"]["patch_dir"]}""") - logger.info(f"""Start issue: {start_issue.strftime("%Y-%m-%d")}""") - logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""") + logger.info( + "Starting patching", + patch_directory=params["patch"]["patch_dir"], + start_issue=start_issue.strftime("%Y-%m-%d"), + end_issue=end_issue.strftime("%Y-%m-%d"), + ) makedirs(params["patch"]["patch_dir"], exist_ok=True) current_issue = start_issue while current_issue <= end_issue: - logger.info(f"""Running issue {current_issue.strftime("%Y-%m-%d")}""") + logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d")) params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d") diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index 53c4cd33b..39ea56d2f 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -23,7 +23,7 @@ from .backfill import (store_backfill_file, merge_backfill_file) -def run_module(params): +def run_module(params, logger=None): """ Generate updated claims-based hospitalization indicator values. @@ -54,19 +54,24 @@ def run_module(params): adjustments (False). """ start_time = time.time() - logger = get_structured_logger( - __name__, filename=params["common"].get("log_filename"), - log_exceptions=params["common"].get("log_exceptions", True)) + issue_date_str = params.get("patch", {}).get("current_issue", None) + issue_date = datetime.strptime(issue_date_str, "%Y-%m-%d") + if not logger: + logger = get_structured_logger( + __name__, + filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True), + ) # pull latest data download(params["indicator"]["ftp_credentials"], - params["indicator"]["input_dir"], logger) + params["indicator"]["input_dir"], logger, issue_date=issue_date) # aggregate data modify_and_write(params["indicator"]["input_dir"], logger) # find the latest files (these have timestamps) - claims_file = get_latest_filename(params["indicator"]["input_dir"], logger) + claims_file = get_latest_filename(params["indicator"]["input_dir"], logger, issue_date=issue_date) # handle range of estimates to produce # filename expected to have format: EDI_AGG_INPATIENT_DDMMYYYY_HHMM{timezone}.csv.gz From 5b9e69a04614eb13e953b087be7d0ebd8a0c285f Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 27 Aug 2024 09:52:52 -0400 Subject: [PATCH 04/14] fixed typo and conditional --- claims_hosp/delphi_claims_hosp/get_latest_claims_name.py | 4 ++-- claims_hosp/delphi_claims_hosp/patch.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py b/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py index ab03cbd14..eae5763ed 100644 --- a/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py +++ b/claims_hosp/delphi_claims_hosp/get_latest_claims_name.py @@ -23,8 +23,8 @@ def get_latest_filename(dir_path, logger, issue_date=None): if timestamp <= current_date: latest_timestamp = timestamp latest_filename = file - - assert current_date.date() == latest_timestamp.date(), "no drop for today" + if issue_date is None: + assert current_date.date() == latest_timestamp.date(), "no drop for today" logger.info("Latest claims file", filename=latest_filename) diff --git a/claims_hosp/delphi_claims_hosp/patch.py b/claims_hosp/delphi_claims_hosp/patch.py index d3de10c33..104bd90aa 100644 --- a/claims_hosp/delphi_claims_hosp/patch.py +++ b/claims_hosp/delphi_claims_hosp/patch.py @@ -62,7 +62,7 @@ def patch(): params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d") current_issue_yyyymmdd = current_issue.strftime("%Y%m%d") - current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/doctor-visits""" + current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/hospital-admissions""" makedirs(f"{current_issue_dir}", exist_ok=True) params["common"]["export_dir"] = f"""{current_issue_dir}""" From 1bb84d8888c13068737cb4054d2910a989af3d6d Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 28 Aug 2024 16:24:27 -0400 Subject: [PATCH 05/14] feat: adding patching with backfill --- claims_hosp/delphi_claims_hosp/backfill.py | 64 ++++++++++++++-- claims_hosp/delphi_claims_hosp/run.py | 24 ++++-- claims_hosp/tests/test_backfill.py | 75 ++++++++++++++++++- .../tests/test_download_claims_ftp_files.py | 23 +++++- claims_hosp/tests/test_patch.py | 31 ++++++++ 5 files changed, 200 insertions(+), 17 deletions(-) create mode 100644 claims_hosp/tests/test_patch.py diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 495abd59b..75e6603fa 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -5,15 +5,18 @@ Created: 2022-08-03 """ -import os + import glob +import os +import re +import shutil from datetime import datetime +from typing import Union # third party import pandas as pd from delphi_utils import GeoMapper - from .config import Config gmpr = GeoMapper() @@ -69,9 +72,58 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir): "/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") # Store intermediate file into the backfill folder backfilldata.to_parquet(path, index=False) + return path + + +def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger): + """ + Merge existing backfill with the patch data included. + Parameters + ---------- + issue_date : datetime + The most recent date when the raw data is received + backfill_dir : str + specified path to store backfill files. + backfill_file : str + + """ + + new_files = glob.glob(backfill_dir + "/claims_hosp_*") + + def get_file_with_date(files) -> Union[str, None]: + for filename in files: + pattern = re.findall(r"\d{8}", filename) + if len(pattern) == 2: + start_date = datetime.strptime(pattern[0], "%Y%m%d") + end_date = datetime.strptime(pattern[1], "%Y%m%d") + if start_date <= issue_date or end_date <= issue_date: + return filename + return "" + + file_name = get_file_with_date(new_files) + + if len(file_name) == 0: + logger.info("patch file is too recent to merge", issue_date=issue_date.strftime("%Y-%m-%d")) + return + + # Start to merge files + merge_file = f"{file_name.split('.')[0]}_after_merge.parquet" + try: + shutil.copyfile(file_name, merge_file) + existing_df = pd.read_parquet(merge_file, engine="pyarrow") + df = pd.read_parquet(backfill_file, engine="pyarrow") + merged_df = pd.concat([existing_df, df]).sort_values(["time_value", "fips"]) + merged_df.to_parquet(merge_file, index=False) + os.remove(file_name) + os.rename(merge_file, file_name) + # pylint: disable=W0703: + except Exception as e: + os.remove(merge_file) + logger.error(e) + return + -def merge_backfill_file(backfill_dir, backfill_merge_day, today, - test_mode=False, check_nd=25): +def merge_backfill_file(backfill_dir, backfill_merge_day, most_recent, test_mode=False, check_nd=25): """ Merge ~4 weeks' backfill data into one file. @@ -80,7 +132,7 @@ def merge_backfill_file(backfill_dir, backfill_merge_day, today, threshold to allow flexibility in data delivery. Parameters ---------- - today : datetime + most_recent : datetime The most recent date when the raw data is received backfill_dir : str specified path to store backfill files. @@ -109,7 +161,7 @@ def get_date(file_link): # Check whether to merge # Check the number of files that are not merged - if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd: + if most_recent.weekday() != backfill_merge_day or (most_recent - earliest_date).days <= check_nd: return # Start to merge files diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index 39ea56d2f..b58aea934 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -5,22 +5,24 @@ when the module is run with `python -m delphi_claims_hosp`. """ +import os + # standard packages import time -import os from datetime import datetime, timedelta from pathlib import Path # third party from delphi_utils import get_structured_logger +from .backfill import merge_backfill_file, merge_existing_backfill_files, store_backfill_file + # first party from .config import Config from .download_claims_ftp_files import download -from .modify_claims_drops import modify_and_write from .get_latest_claims_name import get_latest_filename +from .modify_claims_drops import modify_and_write from .update_indicator import ClaimsHospIndicatorUpdater -from .backfill import (store_backfill_file, merge_backfill_file) def run_module(params, logger=None): @@ -56,6 +58,10 @@ def run_module(params, logger=None): start_time = time.time() issue_date_str = params.get("patch", {}).get("current_issue", None) issue_date = datetime.strptime(issue_date_str, "%Y-%m-%d") + # safety check for patch parameters exists in file, but not running custom runs/patches + custom_run_flag = ( + False if not params["indicator"].get("custom_run", False) else params["indicator"].get("custom_run", False) + ) if not logger: logger = get_structured_logger( __name__, @@ -64,8 +70,7 @@ def run_module(params, logger=None): ) # pull latest data - download(params["indicator"]["ftp_credentials"], - params["indicator"]["input_dir"], logger, issue_date=issue_date) + download(params["indicator"]["ftp_credentials"], params["indicator"]["input_dir"], logger, issue_date=issue_date) # aggregate data modify_and_write(params["indicator"]["input_dir"], logger) @@ -99,8 +104,13 @@ def run_module(params, logger=None): if params["indicator"].get("generate_backfill_files", True): backfill_dir = params["indicator"]["backfill_dir"] backfill_merge_day = params["indicator"]["backfill_merge_day"] - merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today()) - store_backfill_file(claims_file, dropdate_dt, backfill_dir) + if custom_run_flag: + backfilled_filepath = store_backfill_file(claims_file, dropdate_dt, backfill_dir) + merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger) + + else: + merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today()) + store_backfill_file(claims_file, dropdate_dt, backfill_dir) # print out information logger.info("Loaded params", diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index fcd908461..e8bb5c257 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -1,6 +1,9 @@ +import logging import os import glob from datetime import datetime +from pathlib import Path +import shutil # third party import pandas as pd @@ -8,20 +11,22 @@ # first party from delphi_claims_hosp.config import Config, GeoConstants -from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file +from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file, merge_existing_backfill_files CONFIG = Config() CONSTANTS = GeoConstants() +TEST_PATH = Path(__file__).parent PARAMS = { "indicator": { - "input_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz", - "backfill_dir": "./backfill", + "input_file": f"{TEST_PATH}/test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz", + "backfill_dir": f"{TEST_PATH}/backfill", "drop_date": "2020-06-11", } } DATA_FILEPATH = PARAMS["indicator"]["input_file"] DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) backfill_dir = PARAMS["indicator"]["backfill_dir"] +TEST_LOGGER = logging.getLogger() class TestBackfill: @@ -95,3 +100,67 @@ def test_merge_backfill_file(self): os.remove(backfill_dir + "/" + fn) assert fn not in os.listdir(backfill_dir) + + def test_merge_existing_backfill_files(self): + issue_date = datetime(year=2020, month=6, day=13) + issue_date_str = issue_date.strftime("%Y%m%d") + def prep_backfill_data(): + # Generate backfill daily files + for d in range(11, 15): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + + today = datetime(2020, 6, 14) + # creating expected file + merge_backfill_file(backfill_dir, today.weekday(), today, + test_mode=True, check_nd=2) + original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet" + os.rename(original, f"{backfill_dir}/expected.parquet") + + # creating backfill without issue date + os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet") + today = datetime(2020, 6, 14) + merge_backfill_file(backfill_dir, today.weekday(), today, + test_mode=True, check_nd=2) + + old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") + for file in old_files: + os.remove(file) + + prep_backfill_data() + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER) + + expected = pd.read_parquet(f"{backfill_dir}/expected.parquet") + merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet") + + check_diff = expected.merge(merged, how='left', indicator=True) + assert check_diff[check_diff["_merge"] == "both"].shape[0] == expected.shape[0] + for file in glob.glob(backfill_dir + "/*.parquet"): + os.remove(file) + + + def test_merge_existing_backfill_files_no_call(self): + issue_date = datetime(year=2020, month=6, day=20) + issue_date_str = issue_date.strftime("%Y%m%d") + def prep_backfill_data(): + # Generate backfill daily files + for d in range(11, 15): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + + today = datetime(2020, 6, 14) + # creating expected file + merge_backfill_file(backfill_dir, today.weekday(), today, + test_mode=True, check_nd=8) + + prep_backfill_data() + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER) + + old_files = glob.glob(backfill_dir + "*.parquet") + for file in old_files: + os.remove(file) + + + diff --git a/claims_hosp/tests/test_download_claims_ftp_files.py b/claims_hosp/tests/test_download_claims_ftp_files.py index 3cde21ee5..dc60299bf 100644 --- a/claims_hosp/tests/test_download_claims_ftp_files.py +++ b/claims_hosp/tests/test_download_claims_ftp_files.py @@ -1,19 +1,40 @@ # standard import datetime import re +from mock import MagicMock, patch +import logging # third party import numpy as np +from freezegun import freeze_time # first party from delphi_claims_hosp.download_claims_ftp_files import (change_date_format, - get_timestamp) + get_timestamp, download) OLD_FILENAME_TIMESTAMP = re.compile( r".*EDI_AGG_INPATIENT_[0-9]_(?P[0-9]*)_(?P[0-9]*)[^0-9]*") NEW_FILENAME_TIMESTAMP = re.compile(r".*EDI_AGG_INPATIENT_(?P[0-9]*)_(?P[0-9]*)[^0-9]*") +TEST_LOGGER = logging.getLogger() class TestDownloadClaimsFtpFiles: + + @patch('delphi_claims_hosp.download_claims_ftp_files.paramiko.SSHClient') + @patch('delphi_claims_hosp.download_claims_ftp_files.path.exists', return_value=False) + def test_download(self, mock_exists, mock_sshclient): + mock_sshclient_instance = MagicMock() + mock_sshclient.return_value = mock_sshclient_instance + mock_sftp = MagicMock() + mock_sshclient_instance.open_sftp.return_value = mock_sftp + mock_sftp.listdir_attr.return_value = [MagicMock(filename="SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz")] + ftp_credentials = {"host": "test_host", "user": "test_user", "pass": "test_pass", "port": "test_port"} + out_path = "./test_data/" + + issue_date = datetime.datetime(2020, 11, 7) + download(ftp_credentials, out_path, TEST_LOGGER, issue_date=issue_date) + mock_sshclient_instance.connect.assert_called_once_with(ftp_credentials["host"], username=ftp_credentials["user"], password=ftp_credentials["pass"], port=ftp_credentials["port"]) + mock_sftp.get.assert_called() + def test_change_date_format(self): name = "SYNEDI_AGG_INPATIENT_20200611_1451CDT" diff --git a/claims_hosp/tests/test_patch.py b/claims_hosp/tests/test_patch.py new file mode 100644 index 000000000..37688bda2 --- /dev/null +++ b/claims_hosp/tests/test_patch.py @@ -0,0 +1,31 @@ +import unittest +from unittest.mock import patch as mock_patch +from delphi_claims_hosp.patch import patch +import os +import shutil + +class TestPatchModule: + def test_patch(self): + with mock_patch('delphi_claims_hosp.patch.get_structured_logger') as mock_get_structured_logger, \ + mock_patch('delphi_claims_hosp.patch.read_params') as mock_read_params, \ + mock_patch('delphi_claims_hosp.patch.run_module') as mock_run_module: + + mock_read_params.return_value = { + "common": { + "log_filename": "test.log" + }, + "patch": { + "start_issue": "2021-01-01", + "end_issue": "2021-01-02", + "patch_dir": "./patch_dir" + } + } + + patch() + + assert os.path.isdir('./patch_dir') + assert os.path.isdir('./patch_dir/issue_20210101/hospital-admissions') + assert os.path.isdir('./patch_dir/issue_20210102/hospital-admissions') + + # Clean up the created directories after the test + shutil.rmtree(mock_read_params.return_value["patch"]["patch_dir"]) \ No newline at end of file From 3d5701cd6de30812bf7d603d32a37c6fbb3ecf3b Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Fri, 30 Aug 2024 12:54:30 -0400 Subject: [PATCH 06/14] lint --- claims_hosp/delphi_claims_hosp/backfill.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 75e6603fa..91c812091 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -78,6 +78,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir): def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger): """ Merge existing backfill with the patch data included. + Parameters ---------- issue_date : datetime @@ -85,9 +86,7 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge backfill_dir : str specified path to store backfill files. backfill_file : str - """ - new_files = glob.glob(backfill_dir + "/claims_hosp_*") def get_file_with_date(files) -> Union[str, None]: From 230086ab41f7bc126765d66be29af79c92f5a309 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Fri, 6 Sep 2024 10:18:30 -0400 Subject: [PATCH 07/14] suggested changes --- claims_hosp/delphi_claims_hosp/backfill.py | 3 +++ claims_hosp/delphi_claims_hosp/patch.py | 7 ++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 91c812091..b8a9b05d4 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -79,6 +79,9 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge """ Merge existing backfill with the patch data included. + When the indicator fails for some reason or another, there's a gap in the backfill files. + The patch to fill in the missing dates happens later down the line when the backfill files are already merged. + This function takes the merged files with the missing date, insert the particular date, and merge back the file. Parameters ---------- issue_date : datetime diff --git a/claims_hosp/delphi_claims_hosp/patch.py b/claims_hosp/delphi_claims_hosp/patch.py index 104bd90aa..b812bce24 100644 --- a/claims_hosp/delphi_claims_hosp/patch.py +++ b/claims_hosp/delphi_claims_hosp/patch.py @@ -1,17 +1,18 @@ """ -This module is used for patching data in the delphi_doctor_visits package. +This module is used for patching data in the delphi_claims_hosp package. To use this module, you need to specify the range of issue dates in params.json, like so: { "common": { + "custom_flag" : true, ... }, "validation": { ... }, "patch": { - "patch_dir": "/covidcast-indicators/doctor_visits/AprilPatch", + "patch_dir": "/covidcast-indicators/hopspital-admissions/patch", "start_issue": "2024-04-20", "end_issue": "2024-04-21" } @@ -31,7 +32,7 @@ def patch(): """ - Run the doctor visits indicator for a range of issue dates. + Run the hospital-admissions indicator for a range of issue dates. The range of issue dates is specified in params.json using the following keys: - "patch": Only used for patching data From 7bd15be93d85de222beed94042207d3515f17d7a Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Fri, 6 Sep 2024 10:21:13 -0400 Subject: [PATCH 08/14] suggested changes --- claims_hosp/delphi_claims_hosp/backfill.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index b8a9b05d4..98bb8cfb6 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -77,7 +77,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir): def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger): """ - Merge existing backfill with the patch data included. + Merge existing backfill with the patch data included. This function is specifically run for patching. When the indicator fails for some reason or another, there's a gap in the backfill files. The patch to fill in the missing dates happens later down the line when the backfill files are already merged. From b71dd820e7cd6623363ec5d122fc0c9c39884f3a Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Tue, 15 Oct 2024 10:55:11 -0400 Subject: [PATCH 09/14] making backfill to monthly in progess --- changehc/delphi_changehc/patch.py | 0 claims_hosp/delphi_claims_hosp/backfill.py | 40 ++++++++-------------- claims_hosp/delphi_claims_hosp/run.py | 2 +- claims_hosp/tests/test_backfill.py | 35 +++++++------------ 4 files changed, 28 insertions(+), 49 deletions(-) create mode 100644 changehc/delphi_changehc/patch.py diff --git a/changehc/delphi_changehc/patch.py b/changehc/delphi_changehc/patch.py new file mode 100644 index 000000000..e69de29bb diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 98bb8cfb6..ff1bb6877 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -10,11 +10,12 @@ import os import re import shutil -from datetime import datetime +from datetime import datetime, timedelta from typing import Union # third party import pandas as pd +import pytz from delphi_utils import GeoMapper from .config import Config @@ -94,11 +95,12 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge def get_file_with_date(files) -> Union[str, None]: for filename in files: - pattern = re.findall(r"\d{8}", filename) - if len(pattern) == 2: - start_date = datetime.strptime(pattern[0], "%Y%m%d") - end_date = datetime.strptime(pattern[1], "%Y%m%d") - if start_date <= issue_date or end_date <= issue_date: + pattern = re.findall(r"\d{6}", filename) + if len(pattern) == 1: + file_month = datetime.strptime(pattern[0], "%Y%m") + start_date = file_month.replace(day=1) + end_date = (start_date + timedelta(days=32)).replace(day=1) + if issue_date >= start_date and issue_date < end_date: return filename return "" @@ -125,27 +127,17 @@ def get_file_with_date(files) -> Union[str, None]: return -def merge_backfill_file(backfill_dir, backfill_merge_day, most_recent, test_mode=False, check_nd=25): +def merge_backfill_file(backfill_dir, most_recent, logger, test_mode=False): """ - Merge ~4 weeks' backfill data into one file. + Merge a month's source data into one file. - Usually this function should merge 28 days' data into a new file so as to - save the reading time when running the backfill pipelines. We set a softer - threshold to allow flexibility in data delivery. Parameters ---------- most_recent : datetime The most recent date when the raw data is received backfill_dir : str specified path to store backfill files. - backfill_merge_day: int - The day of a week that we used to merge the backfill files. e.g. 0 - is Monday. test_mode: bool - check_nd: int - The criteria of the number of unmerged files. Ideally, we want the - number to be 28, but we use a looser criteria from practical - considerations """ new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") if len(new_files) == 0: # if no any daily file is stored @@ -158,23 +150,19 @@ def get_date(file_link): return datetime.strptime(fn, "%Y%m%d") date_list = list(map(get_date, new_files)) - earliest_date = min(date_list) latest_date = max(date_list) - - # Check whether to merge - # Check the number of files that are not merged - if most_recent.weekday() != backfill_merge_day or (most_recent - earliest_date).days <= check_nd: + if latest_date.month == most_recent.month: + logger.info("Not a new month; skipping merging") return + # Start to merge files pdList = [] for fn in new_files: df = pd.read_parquet(fn, engine='pyarrow') pdList.append(df) merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) - path = backfill_dir + "/claims_hosp_from_%s_to_%s.parquet"%( - datetime.strftime(earliest_date, "%Y%m%d"), - datetime.strftime(latest_date, "%Y%m%d")) + path = f"{backfill_dir}/claims_hosp_{datetime.strftime(latest_date, '%Y%m')}.parquet" merged_file.to_parquet(path, index=False) # Delete daily files once we have the merged one. diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index b58aea934..fe8d76fda 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -109,7 +109,7 @@ def run_module(params, logger=None): merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger) else: - merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today()) + merge_backfill_file(backfill_dir, datetime.today()) store_backfill_file(claims_file, dropdate_dt, backfill_dir) # print out information diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index e8bb5c257..ec0de4e5c 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -3,7 +3,6 @@ import glob from datetime import datetime from pathlib import Path -import shutil # third party import pandas as pd @@ -49,16 +48,13 @@ def test_store_backfill_file(self): assert fn not in os.listdir(backfill_dir) def test_merge_backfill_file(self): - - today = datetime.today() - - fn = "claims_hosp_from_20200611_to_20200614.parquet" + fn = "claims_hosp_202006.parquet" assert fn not in os.listdir(backfill_dir) # Check when there is no daily file to merge. today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=8) + merge_backfill_file(backfill_dir, today, TEST_LOGGER, + test_mode=True) assert fn not in os.listdir(backfill_dir) # Generate backfill daily files @@ -66,15 +62,10 @@ def test_merge_backfill_file(self): dropdate = datetime(2020, 6, d) store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) - # Check the when the merged file is not generated - today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=8) - assert fn not in os.listdir(backfill_dir) - - # Generate the merged file, but not delete it - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=2) + # Check when the merged file is not generated + today = datetime(2020, 7, 1) + merge_backfill_file(backfill_dir, today, TEST_LOGGER, + test_mode=True) assert fn in os.listdir(backfill_dir) # Read daily file @@ -112,15 +103,15 @@ def prep_backfill_data(): today = datetime(2020, 6, 14) # creating expected file - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=2) + merge_backfill_file(backfill_dir, today, TEST_LOGGER, + test_mode=True) original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet" os.rename(original, f"{backfill_dir}/expected.parquet") # creating backfill without issue date os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet") today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today.weekday(), today, + merge_backfill_file(backfill_dir, today, test_mode=True, check_nd=2) old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") @@ -141,7 +132,7 @@ def prep_backfill_data(): def test_merge_existing_backfill_files_no_call(self): - issue_date = datetime(year=2020, month=6, day=20) + issue_date = datetime(year=2020, month=5, day=20) issue_date_str = issue_date.strftime("%Y%m%d") def prep_backfill_data(): # Generate backfill daily files @@ -151,8 +142,8 @@ def prep_backfill_data(): today = datetime(2020, 6, 14) # creating expected file - merge_backfill_file(backfill_dir, today.weekday(), today, - test_mode=True, check_nd=8) + merge_backfill_file(backfill_dir, today, TEST_LOGGER, + test_mode=True) prep_backfill_data() file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir) From eed2a638041de5f33b806b6ff44372352ecc8bf7 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Thu, 17 Oct 2024 11:28:10 -0400 Subject: [PATCH 10/14] adjusting logic to match new naming format and chunking --- claims_hosp/tests/test_backfill.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index ec0de4e5c..661946436 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -69,7 +69,7 @@ def test_merge_backfill_file(self): assert fn in os.listdir(backfill_dir) # Read daily file - new_files = glob.glob(backfill_dir + "/claims_hosp*.parquet") + new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*.parquet") pdList = [] for file in new_files: if "from" in file: From 65a06d8d89581b6307227bb0f9d994a4e0e640d2 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Mon, 21 Oct 2024 13:54:28 -0400 Subject: [PATCH 11/14] added logging and more clean up --- claims_hosp/delphi_claims_hosp/backfill.py | 37 ++++--- claims_hosp/tests/test_backfill.py | 122 ++++++++++++--------- 2 files changed, 92 insertions(+), 67 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index ff1bb6877..07db1827b 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -22,7 +22,7 @@ gmpr = GeoMapper() -def store_backfill_file(claims_filepath, _end_date, backfill_dir): +def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): """ Store county level backfill data into backfill_dir. @@ -57,6 +57,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir): backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date) & (~backfilldata["fips"].isnull()), selected_columns] + logger.info("Filtering backfill data", startdate=_start_date, enddate=_end_date) backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]] backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d") @@ -69,10 +70,15 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir): "state_id": "string" }) - path = backfill_dir + \ - "/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + filename = "claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + path = f"{backfill_dir}/{filename}" + # Store intermediate file into the backfill folder - backfilldata.to_parquet(path, index=False) + try: + backfilldata.to_parquet(path, index=False) + logger.info("Stored backfill data in parquet", filename=filename) + except: + logger.info("Failed to store backfill data in parquet", ) return path @@ -90,26 +96,29 @@ def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logge backfill_dir : str specified path to store backfill files. backfill_file : str + specific file add to merged backfill file. """ new_files = glob.glob(backfill_dir + "/claims_hosp_*") def get_file_with_date(files) -> Union[str, None]: for filename in files: - pattern = re.findall(r"\d{6}", filename) - if len(pattern) == 1: - file_month = datetime.strptime(pattern[0], "%Y%m") - start_date = file_month.replace(day=1) - end_date = (start_date + timedelta(days=32)).replace(day=1) - if issue_date >= start_date and issue_date < end_date: + # need to only match files with 6 digits for merged files + pattern = re.findall(r"_(\d{6,6})\.parquet", filename) + if pattern: + file_month = datetime.strptime(pattern[0], "%Y%m").replace(day=1) + end_date = (file_month + timedelta(days=32)).replace(day=1) + if issue_date >= file_month and issue_date < end_date: return filename return "" file_name = get_file_with_date(new_files) if len(file_name) == 0: - logger.info("patch file is too recent to merge", issue_date=issue_date.strftime("%Y-%m-%d")) + logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) return + logger.info("Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_name) + # Start to merge files merge_file = f"{file_name.split('.')[0]}_after_merge.parquet" try: @@ -139,8 +148,10 @@ def merge_backfill_file(backfill_dir, most_recent, logger, test_mode=False): specified path to store backfill files. test_mode: bool """ - new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") + previous_month = (most_recent.replace(day=1) - timedelta(days=1)).strftime("%Y%m") + new_files = glob.glob(backfill_dir + f"/claims_hosp_as_of_{previous_month}*") if len(new_files) == 0: # if no any daily file is stored + logger.info("No new files to merge; skipping merging") return def get_date(file_link): @@ -155,7 +166,7 @@ def get_date(file_link): logger.info("Not a new month; skipping merging") return - + logger.info(f"Merging files", start_date=date_list[0], end_date=date_list[-1]) # Start to merge files pdList = [] for fn in new_files: diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index 661946436..a72c3496e 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -9,6 +9,7 @@ import pytest # first party +from delphi_utils.logger import get_structured_logger from delphi_claims_hosp.config import Config, GeoConstants from delphi_claims_hosp.backfill import store_backfill_file, merge_backfill_file, merge_existing_backfill_files @@ -25,55 +26,65 @@ DATA_FILEPATH = PARAMS["indicator"]["input_file"] DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) backfill_dir = PARAMS["indicator"]["backfill_dir"] -TEST_LOGGER = logging.getLogger() class TestBackfill: - def test_store_backfill_file(self): - dropdate = datetime(2020, 1, 1) + def cleanup(self): + for file in glob.glob(f"{backfill_dir}/*.parquet"): + os.remove(file) + + def test_store_backfill_file(self, caplog): + dropdate = datetime(2020, 1, 1) fn = "claims_hosp_as_of_20200101.parquet" - assert fn not in os.listdir(backfill_dir) - + caplog.set_level(logging.INFO) + logger = get_structured_logger() + num_rows = len(pd.read_csv(DATA_FILEPATH)) + # Store backfill file - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) assert fn in os.listdir(backfill_dir) + assert "Stored backfill data in parquet" in caplog.text + + fn = "claims_hosp_as_of_20200101.parquet" backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow') - + selected_columns = ['time_value', 'fips', 'state_id', 'num', 'den', 'lag', 'issue_date'] - assert set(selected_columns) == set(backfill_df.columns) - - os.remove(backfill_dir + "/" + fn) - assert fn not in os.listdir(backfill_dir) + + assert set(selected_columns) == set(backfill_df.columns) + assert num_rows == len(backfill_df) + + self.cleanup() - def test_merge_backfill_file(self): + def test_merge_backfill_file(self, caplog): fn = "claims_hosp_202006.parquet" - assert fn not in os.listdir(backfill_dir) - + caplog.set_level(logging.INFO) + logger = get_structured_logger() + # Check when there is no daily file to merge. today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today, TEST_LOGGER, + merge_backfill_file(backfill_dir, today, logger, test_mode=True) assert fn not in os.listdir(backfill_dir) - - # Generate backfill daily files + assert "No new files to merge; skipping merging" in caplog.text + + + # Generate backfill daily files for d in range(11, 15): - dropdate = datetime(2020, 6, d) - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) - - # Check when the merged file is not generated + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + today = datetime(2020, 7, 1) - merge_backfill_file(backfill_dir, today, TEST_LOGGER, + merge_backfill_file(backfill_dir, today, logger, test_mode=True) + assert "Merging files" in caplog.text assert fn in os.listdir(backfill_dir) # Read daily file new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*.parquet") - pdList = [] + pdList = [] for file in new_files: - if "from" in file: - continue df = pd.read_parquet(file, engine='pyarrow') pdList.append(df) os.remove(file) @@ -81,77 +92,80 @@ def test_merge_backfill_file(self): assert len(new_files) == 1 expected = pd.concat(pdList).sort_values(["time_value", "fips"]) - + # Read the merged file merged = pd.read_parquet(backfill_dir + "/" + fn, engine='pyarrow') - + assert set(expected.columns) == set(merged.columns) assert expected.shape[0] == merged.shape[0] assert expected.shape[1] == merged.shape[1] - - os.remove(backfill_dir + "/" + fn) - assert fn not in os.listdir(backfill_dir) - def test_merge_existing_backfill_files(self): + self.cleanup() + + def test_merge_existing_backfill_files(self, caplog): issue_date = datetime(year=2020, month=6, day=13) issue_date_str = issue_date.strftime("%Y%m%d") + caplog.set_level(logging.INFO) + logger = get_structured_logger() def prep_backfill_data(): # Generate backfill daily files for d in range(11, 15): dropdate = datetime(2020, 6, d) - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) - today = datetime(2020, 6, 14) + today = datetime(2020, 7, 1) # creating expected file - merge_backfill_file(backfill_dir, today, TEST_LOGGER, + merge_backfill_file(backfill_dir, today, logger, test_mode=True) - original = f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet" + original = f"{backfill_dir}/claims_hosp_202006.parquet" os.rename(original, f"{backfill_dir}/expected.parquet") # creating backfill without issue date os.remove(f"{backfill_dir}/claims_hosp_as_of_{issue_date_str}.parquet") - today = datetime(2020, 6, 14) - merge_backfill_file(backfill_dir, today, - test_mode=True, check_nd=2) + merge_backfill_file(backfill_dir, today, logger, + test_mode=True) old_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*") for file in old_files: os.remove(file) prep_backfill_data() - file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir) - merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER) + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) + + assert "Adding missing date to merged file" in caplog.text expected = pd.read_parquet(f"{backfill_dir}/expected.parquet") - merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_from_20200611_to_20200614.parquet") + merged = pd.read_parquet(f"{backfill_dir}/claims_hosp_202006.parquet") - check_diff = expected.merge(merged, how='left', indicator=True) - assert check_diff[check_diff["_merge"] == "both"].shape[0] == expected.shape[0] - for file in glob.glob(backfill_dir + "/*.parquet"): - os.remove(file) + check = pd.concat([merged, expected]).drop_duplicates(keep=False) + assert len(check) == 0 - def test_merge_existing_backfill_files_no_call(self): + self.cleanup() + + + def test_merge_existing_backfill_files_no_call(self, caplog): issue_date = datetime(year=2020, month=5, day=20) - issue_date_str = issue_date.strftime("%Y%m%d") + caplog.set_level(logging.INFO) + logger = get_structured_logger() def prep_backfill_data(): # Generate backfill daily files for d in range(11, 15): dropdate = datetime(2020, 6, d) - store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) today = datetime(2020, 6, 14) # creating expected file - merge_backfill_file(backfill_dir, today, TEST_LOGGER, + merge_backfill_file(backfill_dir, today, logger, test_mode=True) prep_backfill_data() - file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir) - merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, TEST_LOGGER) + file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) + assert "Issue date has no matching merged files" in caplog.text - old_files = glob.glob(backfill_dir + "*.parquet") - for file in old_files: - os.remove(file) + self.cleanup() From 6995c8ae87a2aff156d4f3ce89f5ecff7d518790 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Mon, 28 Oct 2024 11:51:56 -0400 Subject: [PATCH 12/14] added conditional for merging --- claims_hosp/delphi_claims_hosp/backfill.py | 12 +++++---- claims_hosp/tests/test_backfill.py | 29 ++++++++++++++++++++-- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index 07db1827b..b138767e2 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -12,6 +12,7 @@ import shutil from datetime import datetime, timedelta from typing import Union +import calendar # third party import pandas as pd @@ -57,7 +58,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date) & (~backfilldata["fips"].isnull()), selected_columns] - logger.info("Filtering backfill data", startdate=_start_date, enddate=_end_date) + logger.info("Filtering source data", startdate=_start_date, enddate=_end_date) backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]] backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d") @@ -76,9 +77,9 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): # Store intermediate file into the backfill folder try: backfilldata.to_parquet(path, index=False) - logger.info("Stored backfill data in parquet", filename=filename) + logger.info("Stored source data in parquet", filename=filename) except: - logger.info("Failed to store backfill data in parquet", ) + logger.info("Failed to store source data in parquet") return path @@ -162,8 +163,9 @@ def get_date(file_link): date_list = list(map(get_date, new_files)) latest_date = max(date_list) - if latest_date.month == most_recent.month: - logger.info("Not a new month; skipping merging") + num_of_days_in_month = calendar.monthrange(latest_date.year, latest_date.month)[1] + if len(date_list) < num_of_days_in_month: + logger.info("Not enough days, skipping merging", n_file_days=len(date_list)) return logger.info(f"Merging files", start_date=date_list[0], end_date=date_list[-1]) diff --git a/claims_hosp/tests/test_backfill.py b/claims_hosp/tests/test_backfill.py index a72c3496e..acd95671b 100644 --- a/claims_hosp/tests/test_backfill.py +++ b/claims_hosp/tests/test_backfill.py @@ -1,3 +1,4 @@ +import calendar import logging import os import glob @@ -57,7 +58,7 @@ def test_store_backfill_file(self, caplog): self.cleanup() - def test_merge_backfill_file(self, caplog): + def test_merge_backfill_file(self, caplog, monkeypatch): fn = "claims_hosp_202006.parquet" caplog.set_level(logging.INFO) logger = get_structured_logger() @@ -76,6 +77,7 @@ def test_merge_backfill_file(self, caplog): store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) today = datetime(2020, 7, 1) + monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) merge_backfill_file(backfill_dir, today, logger, test_mode=True) assert "Merging files" in caplog.text @@ -102,6 +104,29 @@ def test_merge_backfill_file(self, caplog): self.cleanup() + def test_merge_backfill_file_no_call(self, caplog): + fn = "claims_hosp_202006.parquet" + caplog.set_level(logging.INFO) + logger = get_structured_logger() + + # Check when there is no daily file to merge. + today = datetime(2020, 6, 14) + merge_backfill_file(backfill_dir, today, logger, + test_mode=True) + assert fn not in os.listdir(backfill_dir) + assert "No new files to merge; skipping merging" in caplog.text + + # Generate backfill daily files + for d in range(11, 15): + dropdate = datetime(2020, 6, d) + store_backfill_file(DATA_FILEPATH, dropdate, backfill_dir, logger) + + today = datetime(2020, 7, 1) + merge_backfill_file(backfill_dir, today, logger, + test_mode=True) + assert "Not enough days, skipping merging" in caplog.text + self.cleanup() + def test_merge_existing_backfill_files(self, caplog): issue_date = datetime(year=2020, month=6, day=13) issue_date_str = issue_date.strftime("%Y%m%d") @@ -164,8 +189,8 @@ def prep_backfill_data(): file_to_add = store_backfill_file(DATA_FILEPATH, issue_date, backfill_dir, logger) merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) assert "Issue date has no matching merged files" in caplog.text - self.cleanup() + From 1666e0ccf54e4a194698ef45c090017f9927a758 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 30 Oct 2024 12:30:23 -0400 Subject: [PATCH 13/14] lint --- claims_hosp/delphi_claims_hosp/backfill.py | 16 +++++++++------- claims_hosp/delphi_claims_hosp/run.py | 7 +++---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/claims_hosp/delphi_claims_hosp/backfill.py b/claims_hosp/delphi_claims_hosp/backfill.py index b138767e2..836338223 100644 --- a/claims_hosp/delphi_claims_hosp/backfill.py +++ b/claims_hosp/delphi_claims_hosp/backfill.py @@ -6,23 +6,23 @@ """ +import calendar import glob import os import re import shutil from datetime import datetime, timedelta from typing import Union -import calendar # third party import pandas as pd -import pytz from delphi_utils import GeoMapper from .config import Config gmpr = GeoMapper() + def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): """ Store county level backfill data into backfill_dir. @@ -71,14 +71,14 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger): "state_id": "string" }) - filename = "claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + filename = "claims_hosp_as_of_%s.parquet" % datetime.strftime(_end_date, "%Y%m%d") path = f"{backfill_dir}/{filename}" # Store intermediate file into the backfill folder try: backfilldata.to_parquet(path, index=False) logger.info("Stored source data in parquet", filename=filename) - except: + except: # pylint: disable=W0702 logger.info("Failed to store source data in parquet") return path @@ -108,7 +108,7 @@ def get_file_with_date(files) -> Union[str, None]: if pattern: file_month = datetime.strptime(pattern[0], "%Y%m").replace(day=1) end_date = (file_month + timedelta(days=32)).replace(day=1) - if issue_date >= file_month and issue_date < end_date: + if file_month <= issue_date < end_date: return filename return "" @@ -118,7 +118,9 @@ def get_file_with_date(files) -> Union[str, None]: logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) return - logger.info("Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_name) + logger.info( + "Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_name + ) # Start to merge files merge_file = f"{file_name.split('.')[0]}_after_merge.parquet" @@ -168,7 +170,7 @@ def get_date(file_link): logger.info("Not enough days, skipping merging", n_file_days=len(date_list)) return - logger.info(f"Merging files", start_date=date_list[0], end_date=date_list[-1]) + logger.info("Merging files", start_date=date_list[0], end_date=date_list[-1]) # Start to merge files pdList = [] for fn in new_files: diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index fe8d76fda..0d24192c0 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -103,14 +103,13 @@ def run_module(params, logger=None): # Store backfill data if params["indicator"].get("generate_backfill_files", True): backfill_dir = params["indicator"]["backfill_dir"] - backfill_merge_day = params["indicator"]["backfill_merge_day"] if custom_run_flag: - backfilled_filepath = store_backfill_file(claims_file, dropdate_dt, backfill_dir) + backfilled_filepath = store_backfill_file(claims_file, dropdate_dt, backfill_dir, logger) merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger) else: - merge_backfill_file(backfill_dir, datetime.today()) - store_backfill_file(claims_file, dropdate_dt, backfill_dir) + merge_backfill_file(backfill_dir, datetime.today(), logger) + store_backfill_file(claims_file, dropdate_dt, backfill_dir, logger) # print out information logger.info("Loaded params", From 98d631a9a9e2a73eab1fbaccbc68dc823cd16261 Mon Sep 17 00:00:00 2001 From: Amaris Sim Date: Wed, 30 Oct 2024 13:04:49 -0400 Subject: [PATCH 14/14] remove unrelated file --- changehc/delphi_changehc/patch.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 changehc/delphi_changehc/patch.py diff --git a/changehc/delphi_changehc/patch.py b/changehc/delphi_changehc/patch.py deleted file mode 100644 index e69de29bb..000000000