Skip to content

Commit

Permalink
Add nchs-mortality raw data backups and backup export utility (#2065)
Browse files Browse the repository at this point in the history
* add helper fn in utils to save backup data to csv

* use helper to save nchs data to disk right after pulling

* add backup dir param

* import backup utility

* update arg name

* add gzip + fix old json template log + remove table_name

* fix current tests to take backup dirs and custom_run flag into account

* add logging

* fix log getsize of backup file

* lint

* lint

* lint

* lint

* add backup test

* remove deep copy

---------

Co-authored-by: minhkhul <minhkhul@andrew.cmu.edu>
Co-authored-by: minhkhul <118945681+minhkhul@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 28, 2024
1 parent efeff2f commit 3450dfc
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 19 deletions.
13 changes: 6 additions & 7 deletions _delphi_utils_python/delphi_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
from __future__ import absolute_import

from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer
from .export import create_export_csv
from .utils import read_params

from .slack_notifier import SlackNotifier
from .logger import get_structured_logger
from .export import create_backup_csv, create_export_csv
from .geomap import GeoMapper
from .smooth import Smoother
from .signal import add_prefix
from .logger import get_structured_logger
from .nancodes import Nans
from .signal import add_prefix
from .slack_notifier import SlackNotifier
from .smooth import Smoother
from .utils import read_params
from .weekday import Weekday

__version__ = "0.3.25"
75 changes: 72 additions & 3 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
"""Export data in the format expected by the Delphi API."""
# -*- coding: utf-8 -*-
import gzip
import logging
from datetime import datetime
from os.path import join
from os.path import getsize, join
from typing import Optional
import logging

from epiweeks import Week
import numpy as np
import pandas as pd
from epiweeks import Week

from .nancodes import Nans


def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
"""Find values with contradictory missingness codes, filter them, and log."""
columns = ["val", "se", "sample_size"]
Expand Down Expand Up @@ -132,3 +134,70 @@ def create_export_csv(
export_df = export_df.sort_values(by="geo_id")
export_df.to_csv(export_file, index=False, na_rep="NA")
return dates


def create_backup_csv(
df: pd.DataFrame,
backup_dir: str,
custom_run: bool,
issue: Optional[str] = None,
geo_res: Optional[str] = None,
sensor: Optional[str] = None,
metric: Optional[str] = None,
logger: Optional[logging.Logger] = None,
):
"""Save data for use as a backup.
This function is meant to save raw data fetched from data sources.
Therefore, it avoids manipulating the data as much as possible to
preserve the input.
When only required arguments are passed, data will be saved to a file of
the format `<export_dir>/<today's date as YYYYMMDD>.csv`. Optional arguments
should be passed if the source data is fetched from different tables or
in batches by signal, geo, etc.
Parameters
----------
df: pd.DataFrame
Columns: geo_id, timestamp, val, se, sample_size
backup_dir: str
Backup directory
custom_run: bool
Flag indicating if the current run is a patch, or other run where
backups aren't needed. If so, don't save any data to disk
issue: Optional[str]
The date the data was fetched, in YYYYMMDD format. Defaults to "today"
if not provided
geo_res: Optional[str]
Geographic resolution of the data
sensor: Optional[str]
Sensor that has been calculated (cumulative_counts vs new_counts)
metric: Optional[str]
Metric we are considering, if any.
logger: Optional[logging.Logger]
Pass a logger object here to log information about name and size of the backup file.
Returns
---------
dates: pd.Series[datetime]
Series of dates for which CSV files were exported.
"""
if not custom_run:
# Label the file with today's date (the date the data was fetched).
if not issue:
issue = datetime.today().strftime("%Y%m%d")

backup_filename = [issue, geo_res, metric, sensor]
backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz"
backup_file = join(backup_dir, backup_filename)

with gzip.open(backup_file, "wt", newline="") as f:
df.to_csv(f, index=False, na_rep="NA")

if logger:
logger.info(
"Backup file created",
backup_file=backup_file,
backup_size=getsize(backup_file),
)
1 change: 1 addition & 0 deletions ansible/templates/nchs_mortality-params-prod.json.j2
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"common": {
"daily_export_dir": "./daily_receiving",
"backup_dir": "./raw_data_backups",
"log_filename": "/var/log/indicators/nchs_mortality.log",
"weekly_export_dir": "/common/covidcast/receiving/nchs-mortality"
},
Expand Down
22 changes: 19 additions & 3 deletions nchs_mortality/delphi_nchs_mortality/pull.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
# -*- coding: utf-8 -*-
"""Functions for pulling NCHS mortality data API."""

import logging
from typing import Optional

import numpy as np
import pandas as pd
from delphi_utils import create_backup_csv
from delphi_utils.geomap import GeoMapper
from sodapy import Socrata

from delphi_utils.geomap import GeoMapper
from .constants import METRICS, NEWLINE, RENAME

from .constants import METRICS, RENAME, NEWLINE

def standardize_columns(df):
"""Rename columns to comply with a standard set.
Expand All @@ -22,7 +24,13 @@ def standardize_columns(df):
return df.rename(columns=dict(rename_pairs))


def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None):
def pull_nchs_mortality_data(
socrata_token: str,
backup_dir: str,
custom_run: bool,
logger: Optional[logging.Logger] = None,
test_file: Optional[str] = None,
):
"""Pull the latest NCHS Mortality data, and conforms it into a dataset.
The output dataset has:
Expand All @@ -40,6 +48,10 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None
----------
socrata_token: str
My App Token for pulling the NCHS mortality data
backup_dir: str
Directory to which to save raw backup data
custom_run: bool
Flag indicating if the current run is a patch. If so, don't save any data to disk
test_file: Optional[str]
When not null, name of file from which to read test data
Expand All @@ -60,6 +72,10 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None
client = Socrata("data.cdc.gov", socrata_token)
results = client.get("r8kw-7aab", limit=10**10)
df = pd.DataFrame.from_records(results)

create_backup_csv(df, backup_dir, custom_run=custom_run, logger=logger)

if not test_file:
# drop "By Total" rows
df = df[df["group"].transform(str.lower) == "by week"]

Expand Down
6 changes: 5 additions & 1 deletion nchs_mortality/delphi_nchs_mortality/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def run_module(params: Dict[str, Any]):
days=date.today().weekday() + 2)
export_start_date = export_start_date.strftime('%Y-%m-%d')
daily_export_dir = params["common"]["daily_export_dir"]
backup_dir = params["common"]["backup_dir"]
custom_run = params["common"].get("custom_run", False)
socrata_token = params["indicator"]["socrata_token"]
test_file = params["indicator"].get("test_file", None)

Expand All @@ -70,7 +72,9 @@ def run_module(params: Dict[str, Any]):
daily_arch_diff.update_cache()

stats = []
df_pull = pull_nchs_mortality_data(socrata_token, test_file)
df_pull = pull_nchs_mortality_data(
socrata_token, backup_dir, custom_run=custom_run, test_file=test_file, logger=logger
)
for metric in METRICS:
for geo in ["state", "nation"]:
if metric == 'percent_of_expected_deaths':
Expand Down
3 changes: 2 additions & 1 deletion nchs_mortality/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"common": {
"daily_export_dir": "./daily_receiving",
"weekly_export_dir": "./receiving",
"log_filename": "/var/log/indicators/nchs_mortality.log",
"backup_dir": "./raw_data_backups",
"log_filename": "./nchs_mortality.log",
"log_exceptions": false
},
"indicator": {
Expand Down
120 changes: 120 additions & 0 deletions nchs_mortality/raw_data_backups/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# You should hard commit a prototype for this file, but we
# want to avoid accidental adding of API tokens and other
# private data parameters
params.json

# Do not commit output files
receiving/*.csv

# Remove macOS files
.DS_Store

# virtual environment
dview/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
coverage.xml
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
.static_storage/
.media/
local_settings.py

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# pyenv
.python-version

# celery beat schedule file
celerybeat-schedule

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
4 changes: 3 additions & 1 deletion nchs_mortality/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

PARAMS = {
"common": {
"custom_run": True,
"daily_export_dir": "./daily_receiving",
"weekly_export_dir": "./receiving"
"weekly_export_dir": "./receiving",
"backup_dir": "./raw_data_backups"
},
"indicator": {
"export_start_date": "2020-04-11",
Expand Down
2 changes: 2 additions & 0 deletions nchs_mortality/tests/raw_data_backups/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.csv
*.gz
18 changes: 15 additions & 3 deletions nchs_mortality/tests/test_pull.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import pytest

import pandas as pd
Expand Down Expand Up @@ -34,7 +35,7 @@ def test_standardize_columns(self):
pd.testing.assert_frame_equal(expected, df)

def test_good_file(self):
df = pull_nchs_mortality_data(SOCRATA_TOKEN, "test_data.csv")
df = pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "test_data.csv")

# Test columns
assert (
Expand Down Expand Up @@ -90,9 +91,20 @@ def test_good_file(self):
def test_bad_file_with_inconsistent_time_col(self):
with pytest.raises(ValueError):
pull_nchs_mortality_data(
SOCRATA_TOKEN, "bad_data_with_inconsistent_time_col.csv"
SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_inconsistent_time_col.csv"
)

def test_bad_file_with_missing_cols(self):
with pytest.raises(ValueError):
pull_nchs_mortality_data(SOCRATA_TOKEN, "bad_data_with_missing_cols.csv")
pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_missing_cols.csv")

def test_backup_today_data(self):
today = pd.Timestamp.today().strftime("%Y%m%d")
backup_dir = "./raw_data_backups"
pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv")
backup_file = f"{backup_dir}/{today}.csv.gz"
backup_df = pd.read_csv(backup_file)
source_df = pd.read_csv("test_data/test_data.csv")
pd.testing.assert_frame_equal(source_df, backup_df)
if os.path.exists(backup_file):
os.remove(backup_file)

0 comments on commit 3450dfc

Please sign in to comment.