From 3450dfc09ff8c4bbc9d89c28ddc78b81f52e2ee6 Mon Sep 17 00:00:00 2001 From: nmdefries <42820733+nmdefries@users.noreply.github.com> Date: Mon, 28 Oct 2024 08:21:11 -0400 Subject: [PATCH] Add `nchs-mortality` raw data backups and backup export utility (#2065) * add helper fn in utils to save backup data to csv * use helper to save nchs data to disk right after pulling * add backup dir param * import backup utility * update arg name * add gzip + fix old json template log + remove table_name * fix current tests to take backup dirs and custom_run flag into account * add logging * fix log getsize of backup file * lint * lint * lint * lint * add backup test * remove deep copy --------- Co-authored-by: minhkhul Co-authored-by: minhkhul <118945681+minhkhul@users.noreply.github.com> --- _delphi_utils_python/delphi_utils/__init__.py | 13 +- _delphi_utils_python/delphi_utils/export.py | 75 ++++++++++- .../nchs_mortality-params-prod.json.j2 | 1 + nchs_mortality/delphi_nchs_mortality/pull.py | 22 +++- nchs_mortality/delphi_nchs_mortality/run.py | 6 +- nchs_mortality/params.json.template | 3 +- nchs_mortality/raw_data_backups/.gitignore | 120 ++++++++++++++++++ nchs_mortality/tests/conftest.py | 4 +- .../tests/raw_data_backups/.gitignore | 2 + nchs_mortality/tests/test_pull.py | 18 ++- 10 files changed, 245 insertions(+), 19 deletions(-) create mode 100644 nchs_mortality/raw_data_backups/.gitignore create mode 100644 nchs_mortality/tests/raw_data_backups/.gitignore diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index 7a418551d..ca5693eaf 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -4,15 +4,14 @@ from __future__ import absolute_import from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer -from .export import create_export_csv -from .utils import read_params - -from .slack_notifier import SlackNotifier -from .logger import get_structured_logger +from .export import create_backup_csv, create_export_csv from .geomap import GeoMapper -from .smooth import Smoother -from .signal import add_prefix +from .logger import get_structured_logger from .nancodes import Nans +from .signal import add_prefix +from .slack_notifier import SlackNotifier +from .smooth import Smoother +from .utils import read_params from .weekday import Weekday __version__ = "0.3.25" diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 8ac5de48e..82493032e 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -1,16 +1,18 @@ """Export data in the format expected by the Delphi API.""" # -*- coding: utf-8 -*- +import gzip +import logging from datetime import datetime -from os.path import join +from os.path import getsize, join from typing import Optional -import logging -from epiweeks import Week import numpy as np import pandas as pd +from epiweeks import Week from .nancodes import Nans + def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): """Find values with contradictory missingness codes, filter them, and log.""" columns = ["val", "se", "sample_size"] @@ -132,3 +134,70 @@ def create_export_csv( export_df = export_df.sort_values(by="geo_id") export_df.to_csv(export_file, index=False, na_rep="NA") return dates + + +def create_backup_csv( + df: pd.DataFrame, + backup_dir: str, + custom_run: bool, + issue: Optional[str] = None, + geo_res: Optional[str] = None, + sensor: Optional[str] = None, + metric: Optional[str] = None, + logger: Optional[logging.Logger] = None, +): + """Save data for use as a backup. + + This function is meant to save raw data fetched from data sources. + Therefore, it avoids manipulating the data as much as possible to + preserve the input. + + When only required arguments are passed, data will be saved to a file of + the format `/.csv`. Optional arguments + should be passed if the source data is fetched from different tables or + in batches by signal, geo, etc. + + Parameters + ---------- + df: pd.DataFrame + Columns: geo_id, timestamp, val, se, sample_size + backup_dir: str + Backup directory + custom_run: bool + Flag indicating if the current run is a patch, or other run where + backups aren't needed. If so, don't save any data to disk + issue: Optional[str] + The date the data was fetched, in YYYYMMDD format. Defaults to "today" + if not provided + geo_res: Optional[str] + Geographic resolution of the data + sensor: Optional[str] + Sensor that has been calculated (cumulative_counts vs new_counts) + metric: Optional[str] + Metric we are considering, if any. + logger: Optional[logging.Logger] + Pass a logger object here to log information about name and size of the backup file. + + Returns + --------- + dates: pd.Series[datetime] + Series of dates for which CSV files were exported. + """ + if not custom_run: + # Label the file with today's date (the date the data was fetched). + if not issue: + issue = datetime.today().strftime("%Y%m%d") + + backup_filename = [issue, geo_res, metric, sensor] + backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz" + backup_file = join(backup_dir, backup_filename) + + with gzip.open(backup_file, "wt", newline="") as f: + df.to_csv(f, index=False, na_rep="NA") + + if logger: + logger.info( + "Backup file created", + backup_file=backup_file, + backup_size=getsize(backup_file), + ) diff --git a/ansible/templates/nchs_mortality-params-prod.json.j2 b/ansible/templates/nchs_mortality-params-prod.json.j2 index dbd39598b..4b0d0c4f7 100644 --- a/ansible/templates/nchs_mortality-params-prod.json.j2 +++ b/ansible/templates/nchs_mortality-params-prod.json.j2 @@ -1,6 +1,7 @@ { "common": { "daily_export_dir": "./daily_receiving", + "backup_dir": "./raw_data_backups", "log_filename": "/var/log/indicators/nchs_mortality.log", "weekly_export_dir": "/common/covidcast/receiving/nchs-mortality" }, diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 18bbfd59a..ad54e457a 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -1,15 +1,17 @@ # -*- coding: utf-8 -*- """Functions for pulling NCHS mortality data API.""" +import logging from typing import Optional import numpy as np import pandas as pd +from delphi_utils import create_backup_csv +from delphi_utils.geomap import GeoMapper from sodapy import Socrata -from delphi_utils.geomap import GeoMapper +from .constants import METRICS, NEWLINE, RENAME -from .constants import METRICS, RENAME, NEWLINE def standardize_columns(df): """Rename columns to comply with a standard set. @@ -22,7 +24,13 @@ def standardize_columns(df): return df.rename(columns=dict(rename_pairs)) -def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None): +def pull_nchs_mortality_data( + socrata_token: str, + backup_dir: str, + custom_run: bool, + logger: Optional[logging.Logger] = None, + test_file: Optional[str] = None, +): """Pull the latest NCHS Mortality data, and conforms it into a dataset. The output dataset has: @@ -40,6 +48,10 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None ---------- socrata_token: str My App Token for pulling the NCHS mortality data + backup_dir: str + Directory to which to save raw backup data + custom_run: bool + Flag indicating if the current run is a patch. If so, don't save any data to disk test_file: Optional[str] When not null, name of file from which to read test data @@ -60,6 +72,10 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None client = Socrata("data.cdc.gov", socrata_token) results = client.get("r8kw-7aab", limit=10**10) df = pd.DataFrame.from_records(results) + + create_backup_csv(df, backup_dir, custom_run=custom_run, logger=logger) + + if not test_file: # drop "By Total" rows df = df[df["group"].transform(str.lower) == "by week"] diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index 50ce46cfb..4e88e9d61 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -59,6 +59,8 @@ def run_module(params: Dict[str, Any]): days=date.today().weekday() + 2) export_start_date = export_start_date.strftime('%Y-%m-%d') daily_export_dir = params["common"]["daily_export_dir"] + backup_dir = params["common"]["backup_dir"] + custom_run = params["common"].get("custom_run", False) socrata_token = params["indicator"]["socrata_token"] test_file = params["indicator"].get("test_file", None) @@ -70,7 +72,9 @@ def run_module(params: Dict[str, Any]): daily_arch_diff.update_cache() stats = [] - df_pull = pull_nchs_mortality_data(socrata_token, test_file) + df_pull = pull_nchs_mortality_data( + socrata_token, backup_dir, custom_run=custom_run, test_file=test_file, logger=logger + ) for metric in METRICS: for geo in ["state", "nation"]: if metric == 'percent_of_expected_deaths': diff --git a/nchs_mortality/params.json.template b/nchs_mortality/params.json.template index ed16c620c..2e829de24 100644 --- a/nchs_mortality/params.json.template +++ b/nchs_mortality/params.json.template @@ -2,7 +2,8 @@ "common": { "daily_export_dir": "./daily_receiving", "weekly_export_dir": "./receiving", - "log_filename": "/var/log/indicators/nchs_mortality.log", + "backup_dir": "./raw_data_backups", + "log_filename": "./nchs_mortality.log", "log_exceptions": false }, "indicator": { diff --git a/nchs_mortality/raw_data_backups/.gitignore b/nchs_mortality/raw_data_backups/.gitignore new file mode 100644 index 000000000..552154e09 --- /dev/null +++ b/nchs_mortality/raw_data_backups/.gitignore @@ -0,0 +1,120 @@ +# You should hard commit a prototype for this file, but we +# want to avoid accidental adding of API tokens and other +# private data parameters +params.json + +# Do not commit output files +receiving/*.csv + +# Remove macOS files +.DS_Store + +# virtual environment +dview/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +coverage.xml +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +.static_storage/ +.media/ +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ diff --git a/nchs_mortality/tests/conftest.py b/nchs_mortality/tests/conftest.py index 6ad0f9c59..383d1c782 100644 --- a/nchs_mortality/tests/conftest.py +++ b/nchs_mortality/tests/conftest.py @@ -14,8 +14,10 @@ PARAMS = { "common": { + "custom_run": True, "daily_export_dir": "./daily_receiving", - "weekly_export_dir": "./receiving" + "weekly_export_dir": "./receiving", + "backup_dir": "./raw_data_backups" }, "indicator": { "export_start_date": "2020-04-11", diff --git a/nchs_mortality/tests/raw_data_backups/.gitignore b/nchs_mortality/tests/raw_data_backups/.gitignore new file mode 100644 index 000000000..2b7efbb36 --- /dev/null +++ b/nchs_mortality/tests/raw_data_backups/.gitignore @@ -0,0 +1,2 @@ +*.csv +*.gz \ No newline at end of file diff --git a/nchs_mortality/tests/test_pull.py b/nchs_mortality/tests/test_pull.py index fa58b04a5..4f18210f6 100644 --- a/nchs_mortality/tests/test_pull.py +++ b/nchs_mortality/tests/test_pull.py @@ -1,3 +1,4 @@ +import os import pytest import pandas as pd @@ -34,7 +35,7 @@ def test_standardize_columns(self): pd.testing.assert_frame_equal(expected, df) def test_good_file(self): - df = pull_nchs_mortality_data(SOCRATA_TOKEN, "test_data.csv") + df = pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "test_data.csv") # Test columns assert ( @@ -90,9 +91,20 @@ def test_good_file(self): def test_bad_file_with_inconsistent_time_col(self): with pytest.raises(ValueError): pull_nchs_mortality_data( - SOCRATA_TOKEN, "bad_data_with_inconsistent_time_col.csv" + SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_inconsistent_time_col.csv" ) def test_bad_file_with_missing_cols(self): with pytest.raises(ValueError): - pull_nchs_mortality_data(SOCRATA_TOKEN, "bad_data_with_missing_cols.csv") + pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_missing_cols.csv") + + def test_backup_today_data(self): + today = pd.Timestamp.today().strftime("%Y%m%d") + backup_dir = "./raw_data_backups" + pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv") + backup_file = f"{backup_dir}/{today}.csv.gz" + backup_df = pd.read_csv(backup_file) + source_df = pd.read_csv("test_data/test_data.csv") + pd.testing.assert_frame_equal(source_df, backup_df) + if os.path.exists(backup_file): + os.remove(backup_file)