Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automate hospital admission patch #2043

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
117 changes: 87 additions & 30 deletions claims_hosp/delphi_claims_hosp/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,25 @@
Created: 2022-08-03

"""
import os

import calendar
import glob
from datetime import datetime
import os
import re
import shutil
from datetime import datetime, timedelta
from typing import Union

# third party
import pandas as pd
from delphi_utils import GeoMapper


from .config import Config

gmpr = GeoMapper()

def store_backfill_file(claims_filepath, _end_date, backfill_dir):

def store_backfill_file(claims_filepath, _end_date, backfill_dir, logger):
"""
Store county level backfill data into backfill_dir.

Expand Down Expand Up @@ -53,6 +58,7 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir):
backfilldata = backfilldata.loc[(backfilldata["time_value"] >= _start_date)
& (~backfilldata["fips"].isnull()),
selected_columns]
logger.info("Filtering source data", startdate=_start_date, enddate=_end_date)

backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]]
backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d")
Expand All @@ -65,36 +71,90 @@ def store_backfill_file(claims_filepath, _end_date, backfill_dir):
"state_id": "string"
})

path = backfill_dir + \
"/claims_hosp_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d")
filename = "claims_hosp_as_of_%s.parquet" % datetime.strftime(_end_date, "%Y%m%d")
path = f"{backfill_dir}/{filename}"

# Store intermediate file into the backfill folder
backfilldata.to_parquet(path, index=False)
try:
backfilldata.to_parquet(path, index=False)
logger.info("Stored source data in parquet", filename=filename)
except: # pylint: disable=W0702
logger.info("Failed to store source data in parquet")
return path


def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger):
aysim319 marked this conversation as resolved.
Show resolved Hide resolved
"""
Copy link
Contributor

@jingjtang jingjtang Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add more explanations to this function? Otherwise people can easily get confused by this one and the function below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully the explanation I had made sense.

Merge existing backfill with the patch data included. This function is specifically run for patching.

When the indicator fails for some reason or another, there's a gap in the backfill files.
The patch to fill in the missing dates happens later down the line when the backfill files are already merged.
This function takes the merged files with the missing date, insert the particular date, and merge back the file.
Parameters
----------
issue_date : datetime
The most recent date when the raw data is received
backfill_dir : str
specified path to store backfill files.
backfill_file : str
specific file add to merged backfill file.
"""
new_files = glob.glob(backfill_dir + "/claims_hosp_*")

def get_file_with_date(files) -> Union[str, None]:
for filename in files:
# need to only match files with 6 digits for merged files
pattern = re.findall(r"_(\d{6,6})\.parquet", filename)
if pattern:
file_month = datetime.strptime(pattern[0], "%Y%m").replace(day=1)
end_date = (file_month + timedelta(days=32)).replace(day=1)
if file_month <= issue_date < end_date:
return filename
return ""

file_name = get_file_with_date(new_files)

if len(file_name) == 0:
logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d"))
return

logger.info(
"Adding missing date to merged file", issue_date=issue_date, filename=backfill_file, merged_filename=file_name
)

# Start to merge files
merge_file = f"{file_name.split('.')[0]}_after_merge.parquet"
try:
shutil.copyfile(file_name, merge_file)
existing_df = pd.read_parquet(merge_file, engine="pyarrow")
df = pd.read_parquet(backfill_file, engine="pyarrow")
merged_df = pd.concat([existing_df, df]).sort_values(["time_value", "fips"])
merged_df.to_parquet(merge_file, index=False)
os.remove(file_name)
os.rename(merge_file, file_name)
# pylint: disable=W0703:
except Exception as e:
os.remove(merge_file)
logger.error(e)
return


def merge_backfill_file(backfill_dir, backfill_merge_day, today,
test_mode=False, check_nd=25):
def merge_backfill_file(backfill_dir, most_recent, logger, test_mode=False):
"""
Merge ~4 weeks' backfill data into one file.
Merge a month's source data into one file.

Usually this function should merge 28 days' data into a new file so as to
save the reading time when running the backfill pipelines. We set a softer
threshold to allow flexibility in data delivery.
Parameters
----------
today : datetime
most_recent : datetime
The most recent date when the raw data is received
backfill_dir : str
specified path to store backfill files.
backfill_merge_day: int
The day of a week that we used to merge the backfill files. e.g. 0
is Monday.
test_mode: bool
check_nd: int
The criteria of the number of unmerged files. Ideally, we want the
number to be 28, but we use a looser criteria from practical
considerations
"""
new_files = glob.glob(backfill_dir + "/claims_hosp_as_of_*")
previous_month = (most_recent.replace(day=1) - timedelta(days=1)).strftime("%Y%m")
new_files = glob.glob(backfill_dir + f"/claims_hosp_as_of_{previous_month}*")
if len(new_files) == 0: # if no any daily file is stored
logger.info("No new files to merge; skipping merging")
return

def get_date(file_link):
Expand All @@ -104,23 +164,20 @@ def get_date(file_link):
return datetime.strptime(fn, "%Y%m%d")

date_list = list(map(get_date, new_files))
earliest_date = min(date_list)
latest_date = max(date_list)

# Check whether to merge
# Check the number of files that are not merged
if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd:
num_of_days_in_month = calendar.monthrange(latest_date.year, latest_date.month)[1]
if len(date_list) < num_of_days_in_month:
logger.info("Not enough days, skipping merging", n_file_days=len(date_list))
return

logger.info("Merging files", start_date=date_list[0], end_date=date_list[-1])
# Start to merge files
pdList = []
for fn in new_files:
df = pd.read_parquet(fn, engine='pyarrow')
pdList.append(df)
merged_file = pd.concat(pdList).sort_values(["time_value", "fips"])
path = backfill_dir + "/claims_hosp_from_%s_to_%s.parquet"%(
datetime.strftime(earliest_date, "%Y%m%d"),
datetime.strftime(latest_date, "%Y%m%d"))
path = f"{backfill_dir}/claims_hosp_{datetime.strftime(latest_date, '%Y%m')}.parquet"
merged_file.to_parquet(path, index=False)

# Delete daily files once we have the merged one.
Expand Down
5 changes: 3 additions & 2 deletions claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ def change_date_format(name):
return name


def download(ftp_credentials, out_path, logger):
def download(ftp_credentials, out_path, logger, issue_date=None):
"""Pull the latest raw files."""
current_time = datetime.datetime.now()
current_time = issue_date if issue_date else datetime.datetime.now()

seconds_in_day = 24 * 60 * 60
logger.info("Starting download")

Expand Down
8 changes: 4 additions & 4 deletions claims_hosp/delphi_claims_hosp/get_latest_claims_name.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import datetime
from pathlib import Path

def get_latest_filename(dir_path, logger):
def get_latest_filename(dir_path, logger, issue_date=None):
"""Get the latest filename from the list of downloaded raw files."""
current_date = datetime.datetime.now()
current_date = issue_date if issue_date else datetime.datetime.now()
files = list(Path(dir_path).glob("*"))

latest_timestamp = datetime.datetime(1900, 1, 1)
Expand All @@ -23,8 +23,8 @@ def get_latest_filename(dir_path, logger):
if timestamp <= current_date:
latest_timestamp = timestamp
latest_filename = file

assert current_date.date() == latest_timestamp.date(), "no drop for today"
if issue_date is None:
assert current_date.date() == latest_timestamp.date(), "no drop for today"
Comment on lines +26 to +27
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if issue_date is None:
assert current_date.date() == latest_timestamp.date(), "no drop for today"
assert current_date.date() == latest_timestamp.date(), "no drop for today"

This is fine without the issue date check, since there might be times where a date (or more) in a patch date range really has no source drop on that date.
Since latest_timestamp is only grabbing latest timestamp in the input_dir, not on the ftp server, and the patch code downloads files into input_dir one issue date at a time, the old assert would still do what it's supposed to do fine in patching context.


logger.info("Latest claims file", filename=latest_filename)

Expand Down
75 changes: 75 additions & 0 deletions claims_hosp/delphi_claims_hosp/patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
This module is used for patching data in the delphi_claims_hosp package.

To use this module, you need to specify the range of issue dates in params.json, like so:

{
"common": {
"custom_flag" : true,
...
},
"validation": {
...
},
"patch": {
"patch_dir": "/covidcast-indicators/hopspital-admissions/patch",
"start_issue": "2024-04-20",
"end_issue": "2024-04-21"
}
}

It will generate data for that range of issue dates, and store them in batch issue format:
[name-of-patch]/issue_[issue-date]/doctor-visits/actual_data_file.csv
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from docstring here, I suggest adding some patching instructions like this in the indicator readme too.


from datetime import datetime, timedelta
from os import makedirs

from delphi_utils import get_structured_logger, read_params

from .run import run_module


def patch():
"""
Run the hospital-admissions indicator for a range of issue dates.

The range of issue dates is specified in params.json using the following keys:
- "patch": Only used for patching data
- "start_date": str, YYYY-MM-DD format, first issue date
- "end_date": str, YYYY-MM-DD format, last issue date
- "patch_dir": str, directory to write all issues output
"""
params = read_params()
logger = get_structured_logger("delphi_claims_hosp.patch", filename=params["common"]["log_filename"])

start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")

logger.info(
"Starting patching",
patch_directory=params["patch"]["patch_dir"],
start_issue=start_issue.strftime("%Y-%m-%d"),
end_issue=end_issue.strftime("%Y-%m-%d"),
)

makedirs(params["patch"]["patch_dir"], exist_ok=True)

current_issue = start_issue

while current_issue <= end_issue:
logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d"))

params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d")

current_issue_yyyymmdd = current_issue.strftime("%Y%m%d")
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/hospital-admissions"""
makedirs(f"{current_issue_dir}", exist_ok=True)
params["common"]["export_dir"] = f"""{current_issue_dir}"""

run_module(params, logger)
current_issue += timedelta(days=1)


if __name__ == "__main__":
patch()
40 changes: 27 additions & 13 deletions claims_hosp/delphi_claims_hosp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,27 @@
when the module is run with `python -m delphi_claims_hosp`.
"""

import os

# standard packages
import time
import os
from datetime import datetime, timedelta
from pathlib import Path

# third party
from delphi_utils import get_structured_logger

from .backfill import merge_backfill_file, merge_existing_backfill_files, store_backfill_file

# first party
from .config import Config
from .download_claims_ftp_files import download
from .modify_claims_drops import modify_and_write
from .get_latest_claims_name import get_latest_filename
from .modify_claims_drops import modify_and_write
from .update_indicator import ClaimsHospIndicatorUpdater
from .backfill import (store_backfill_file, merge_backfill_file)


def run_module(params):
def run_module(params, logger=None):
"""
Generate updated claims-based hospitalization indicator values.

Expand Down Expand Up @@ -54,19 +56,27 @@ def run_module(params):
adjustments (False).
"""
start_time = time.time()
logger = get_structured_logger(
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
issue_date_str = params.get("patch", {}).get("current_issue", None)
issue_date = datetime.strptime(issue_date_str, "%Y-%m-%d")
# safety check for patch parameters exists in file, but not running custom runs/patches
custom_run_flag = (
False if not params["indicator"].get("custom_run", False) else params["indicator"].get("custom_run", False)
)
if not logger:
Copy link
Contributor

@jingjtang jingjtang Sep 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be "if logger:" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, basically how the patching code has worked is that it's a wrapper to called the run script within in a for loop with some customization and in order to make sure that logging is different from patching and a regular run is to pass on a logger as a parameter that's created from patch.

if it's a regular run, it's not going to have that

So the logic goes, if the logger exists already, then it's logger from patch, if not we need to create the logger

logger = get_structured_logger(
__name__,
filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True),
)

# pull latest data
download(params["indicator"]["ftp_credentials"],
params["indicator"]["input_dir"], logger)
download(params["indicator"]["ftp_credentials"], params["indicator"]["input_dir"], logger, issue_date=issue_date)

# aggregate data
modify_and_write(params["indicator"]["input_dir"], logger)

# find the latest files (these have timestamps)
claims_file = get_latest_filename(params["indicator"]["input_dir"], logger)
claims_file = get_latest_filename(params["indicator"]["input_dir"], logger, issue_date=issue_date)

# handle range of estimates to produce
# filename expected to have format: EDI_AGG_INPATIENT_DDMMYYYY_HHMM{timezone}.csv.gz
Expand All @@ -93,9 +103,13 @@ def run_module(params):
# Store backfill data
if params["indicator"].get("generate_backfill_files", True):
backfill_dir = params["indicator"]["backfill_dir"]
backfill_merge_day = params["indicator"]["backfill_merge_day"]
merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today())
store_backfill_file(claims_file, dropdate_dt, backfill_dir)
if custom_run_flag:
backfilled_filepath = store_backfill_file(claims_file, dropdate_dt, backfill_dir, logger)
merge_existing_backfill_files(backfill_dir, backfilled_filepath, issue_date, logger)

else:
merge_backfill_file(backfill_dir, datetime.today(), logger)
store_backfill_file(claims_file, dropdate_dt, backfill_dir, logger)

# print out information
logger.info("Loaded params",
Expand Down
Loading
Loading