diff --git a/.bumpversion.cfg b/.bumpversion.cfg index de364861c..547bad12d 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.3.55 +current_version = 0.3.56 commit = True message = chore: bump covidcast-indicators to {new_version} tag = False diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md index 0dcfa85c2..66b0afc62 100644 --- a/.github/CONTRIBUTING.md +++ b/.github/CONTRIBUTING.md @@ -12,7 +12,9 @@ The production branch is configured to automatically deploy to our production en * everything else -All other branches are development branches. We don't enforce a naming policy. +All other branches are development branches. We don't enforce a naming policy, but it is recommended to prefix all branches you create with your name, username, or initials (e.g. `username/branch-name`). + +We don't forbid force-pushing, but please keep to a minimum and be careful of using when modifying a branch at the same time as others. ## Issues @@ -29,7 +31,7 @@ So, how does one go about developing a pipeline for a new data source? ### tl;dr 1. Create your new indicator branch from `main`. -2. Build it using the appropriate template, following the guidelines in the included README.md and REVIEW.md files. +2. Build it using the [indicator template](https://github.com/cmu-delphi/covidcast-indicators/tree/main/_template_python), following the guidelines in the included README.md, REVIEW.md, and INDICATOR_DEV_GUIDE.md files. 3. Make some stuff! 4. When your stuff works, push your development branch to remote, and open a PR against `main` for review. 5. Once your PR has been merged, consult with a platform engineer for the remaining production setup needs. They will create a deployment workflow for your indicator including any necessary production parameters. Production secrets are encrypted in the Ansible vault. This workflow will be tested in staging by admins, who will consult you about any problems they encounter. @@ -50,7 +52,7 @@ git checkout -b dev-my-feature-branch ### Creating your indicator -Create a directory for your new indicator by making a copy of `_template_r` or `_template_python` depending on the programming language you intend to use. If using Python, add the name of the directory to the list found in `jobs > build > strategy > matrix > packages` in `.github/workflows/python-ci.yml`, which will enable automated checks for your indicator when you make PRs. The template copies of `README.md` and `REVIEW.md` include the minimum requirements for code structure, documentation, linting, testing, and method of configuration. Beyond that, we don't have any established restrictions on implementation; you can look at other existing indicators see some examples of code layout, organization, and general approach. +Create a directory for your new indicator by making a copy of `_template_python`. (We also make a `_template_r` available, but R should be only used as a last resort, due to complications using it in production.) Add the name of the directory to the list found in `jobs > build > strategy > matrix > packages` in `.github/workflows/python-ci.yml`, which will enable automated checks for your indicator when you make PRs. The template copies of `README.md` and `REVIEW.md` include the minimum requirements for code structure, documentation, linting, testing, and method of configuration. Beyond that, we don't have any established restrictions on implementation; you can look at other existing indicators see some examples of code layout, organization, and general approach. * Consult your peers with questions! :handshake: @@ -62,7 +64,7 @@ Once you have something that runs locally and passes tests you set up your remot git push -u origin dev-my-feature-branch ``` -You can then draft public API documentation for people who would fetch this +You can then draft [public API documentation](https://cmu-delphi.github.io/delphi-epidata/) for people who would fetch this data from the API. Public API documentation is kept in the delphi-epidata repository, and there is a [template Markdown file](https://github.com/cmu-delphi/delphi-epidata/blob/main/docs/api/covidcast-signals/_source-template.md) @@ -104,7 +106,8 @@ We use a branch-based git workflow coupled with [Jenkins](https://www.jenkins.io * Package - Tar and gzip the built environment. * Deploy - Trigger an Ansible playbook to place the built package onto the runtime host, place any necessary production configuration, and adjust the runtime envirnemnt (if necessary). -There are several additional Jenkins-specific files that will need to be created for each indicator, as well as some configuration additions to the runtime host. It will be important to pair with a platform engineer to prepare the necessary production environment needs, test the workflow, validate on production, and ultimately sign off on a production release. +There are several additional Jenkins-specific files that will need to be created for each indicator, as well as some configuration additions to the runtime host. +It will be important to pair with a platform engineer to prepare the necessary production environment needs, test the workflow, validate on production, and ultimately sign off on a production release. ### Preparing container images of indicators diff --git a/.github/workflows/publish-release.yml b/.github/workflows/publish-release.yml index a9f1ccae9..e7cd8548b 100644 --- a/.github/workflows/publish-release.yml +++ b/.github/workflows/publish-release.yml @@ -86,7 +86,7 @@ jobs: - name: Release run: | make release - - uses: actions/upload-artifact@v2 + - uses: actions/upload-artifact@v4 with: name: delphi_utils path: _delphi_utils_python/dist/*.tar.gz diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index 3e1ee9689..6e23c61d2 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -51,7 +51,7 @@ jobs: with: python-version: 3.8 cache: "pip" - cache-dependency-path: "setup.py" + cache-dependency-path: "pyproject.toml" - name: Install testing dependencies run: | python -m pip install --upgrade pip diff --git a/Jenkinsfile b/Jenkinsfile index 1b9485ca5..3fcdc904a 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -18,6 +18,10 @@ def deploy_production = [:] pipeline { agent any + environment { + // Set the PATH variable to include the pyenv shims directory. + PATH = "/var/lib/jenkins/.pyenv/shims:${env.PATH}" + } stages { stage('Build dev/feature branch') { when { diff --git a/_delphi_utils_python/.bumpversion.cfg b/_delphi_utils_python/.bumpversion.cfg index 722a91e30..08040f0ec 100644 --- a/_delphi_utils_python/.bumpversion.cfg +++ b/_delphi_utils_python/.bumpversion.cfg @@ -1,9 +1,9 @@ [bumpversion] -current_version = 0.3.24 +current_version = 0.3.25 commit = True message = chore: bump delphi_utils to {new_version} tag = False -[bumpversion:file:setup.py] +[bumpversion:file:pyproject.toml] [bumpversion:file:delphi_utils/__init__.py] diff --git a/_delphi_utils_python/DEVELOP.md b/_delphi_utils_python/DEVELOP.md index 2407e29a8..53ffde93f 100644 --- a/_delphi_utils_python/DEVELOP.md +++ b/_delphi_utils_python/DEVELOP.md @@ -9,7 +9,7 @@ To install the module in your default version of Python, run the following from this directory: ``` -pip install . +pip install -e '.[dev]' ``` As described in each of the indicator code directories, you will want to install diff --git a/_delphi_utils_python/Makefile b/_delphi_utils_python/Makefile index 79d7f7943..60f7d2af0 100644 --- a/_delphi_utils_python/Makefile +++ b/_delphi_utils_python/Makefile @@ -6,12 +6,12 @@ venv: install: venv . env/bin/activate; \ pip install wheel ; \ - pip install -e . + pip install -e '.[dev]' install-ci: venv . env/bin/activate; \ - pip install wheel ; \ - pip install . + pip install 'build[virtualenv]' pylint pytest pydocstyle wheel twine ; \ + pip install '.[dev]' lint: . env/bin/activate; pylint delphi_utils --rcfile=../pyproject.toml @@ -30,4 +30,5 @@ clean: release: lint test . env/bin/activate ; \ - python setup.py sdist bdist_wheel + pip install 'build[virtualenv]' ; \ + python -m build --sdist --wheel \ No newline at end of file diff --git a/_delphi_utils_python/README.md b/_delphi_utils_python/README.md index 0ac6350bf..e1b091cae 100644 --- a/_delphi_utils_python/README.md +++ b/_delphi_utils_python/README.md @@ -22,13 +22,22 @@ Source code can be found here: ## Logger Usage +To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic, the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier), and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument to the logger call (for use in filtering, thresholding, grouping, visualization, etc). + +### Commonly used argument names: +- data_source +- geo_type +- signal +- issue_date +- filename + Single-thread usage. ```py from delphi_utils.logger import get_structured_logger logger = get_structured_logger('my_logger') -logger.info('Hello, world!') +logger.info('Hello', name='World') ``` Multi-thread usage. diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index 7ff828440..ca5693eaf 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -4,15 +4,14 @@ from __future__ import absolute_import from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer -from .export import create_export_csv -from .utils import read_params - -from .slack_notifier import SlackNotifier -from .logger import get_structured_logger +from .export import create_backup_csv, create_export_csv from .geomap import GeoMapper -from .smooth import Smoother -from .signal import add_prefix +from .logger import get_structured_logger from .nancodes import Nans +from .signal import add_prefix +from .slack_notifier import SlackNotifier +from .smooth import Smoother +from .utils import read_params from .weekday import Weekday -__version__ = "0.3.24" +__version__ = "0.3.25" diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 37f8faf98..82493032e 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -1,16 +1,18 @@ """Export data in the format expected by the Delphi API.""" # -*- coding: utf-8 -*- +import gzip +import logging from datetime import datetime -from os.path import join +from os.path import getsize, join from typing import Optional -import logging -from epiweeks import Week import numpy as np import pandas as pd +from epiweeks import Week from .nancodes import Nans + def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): """Find values with contradictory missingness codes, filter them, and log.""" columns = ["val", "se", "sample_size"] @@ -22,8 +24,10 @@ def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): for mask in masks: if not logger is None and df.loc[mask].size > 0: logger.info( - "Filtering contradictory missing code in " + - "{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d")) + "Filtering contradictory missing code", + sensor=sensor, + metric=metric, + date=date.strftime(format="%Y-%m-%d"), ) df = df.loc[~mask] elif logger is None and df.loc[mask].size > 0: @@ -130,3 +134,70 @@ def create_export_csv( export_df = export_df.sort_values(by="geo_id") export_df.to_csv(export_file, index=False, na_rep="NA") return dates + + +def create_backup_csv( + df: pd.DataFrame, + backup_dir: str, + custom_run: bool, + issue: Optional[str] = None, + geo_res: Optional[str] = None, + sensor: Optional[str] = None, + metric: Optional[str] = None, + logger: Optional[logging.Logger] = None, +): + """Save data for use as a backup. + + This function is meant to save raw data fetched from data sources. + Therefore, it avoids manipulating the data as much as possible to + preserve the input. + + When only required arguments are passed, data will be saved to a file of + the format `/.csv`. Optional arguments + should be passed if the source data is fetched from different tables or + in batches by signal, geo, etc. + + Parameters + ---------- + df: pd.DataFrame + Columns: geo_id, timestamp, val, se, sample_size + backup_dir: str + Backup directory + custom_run: bool + Flag indicating if the current run is a patch, or other run where + backups aren't needed. If so, don't save any data to disk + issue: Optional[str] + The date the data was fetched, in YYYYMMDD format. Defaults to "today" + if not provided + geo_res: Optional[str] + Geographic resolution of the data + sensor: Optional[str] + Sensor that has been calculated (cumulative_counts vs new_counts) + metric: Optional[str] + Metric we are considering, if any. + logger: Optional[logging.Logger] + Pass a logger object here to log information about name and size of the backup file. + + Returns + --------- + dates: pd.Series[datetime] + Series of dates for which CSV files were exported. + """ + if not custom_run: + # Label the file with today's date (the date the data was fetched). + if not issue: + issue = datetime.today().strftime("%Y%m%d") + + backup_filename = [issue, geo_res, metric, sensor] + backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz" + backup_file = join(backup_dir, backup_filename) + + with gzip.open(backup_file, "wt", newline="") as f: + df.to_csv(f, index=False, na_rep="NA") + + if logger: + logger.info( + "Backup file created", + backup_file=backup_file, + backup_size=getsize(backup_file), + ) diff --git a/_delphi_utils_python/delphi_utils/flash_eval/eval_day.py b/_delphi_utils_python/delphi_utils/flash_eval/eval_day.py index 660fca042..3c8012803 100644 --- a/_delphi_utils_python/delphi_utils/flash_eval/eval_day.py +++ b/_delphi_utils_python/delphi_utils/flash_eval/eval_day.py @@ -153,8 +153,7 @@ def output(evd_ranking, day, lag, signal, logger): p_text += f"\t{start_link}|*{index}*, {'{:.2f}'.format(value)}>\n" else: break - name = f"Signal: {signal} Lag: {lag}" - logger.info(name, payload=p_text) + logger.info("FLaSH: worth inspecting", signal=signal, lag=lag, payload=p_text) def evd_ranking_fn(ts_streams, EVD_max, EVD_min): diff --git a/_delphi_utils_python/delphi_utils/geomap.py b/_delphi_utils_python/delphi_utils/geomap.py index be6df2d24..f6df4a919 100644 --- a/_delphi_utils_python/delphi_utils/geomap.py +++ b/_delphi_utils_python/delphi_utils/geomap.py @@ -443,7 +443,7 @@ def add_population_column( --------- data: pd.DataFrame The dataframe with a FIPS code column. - geocode_type: {"fips", "zip"} + geocode_type: The type of the geocode contained in geocode_col. geocode_col: str, default None The name of the column containing the geocodes. If None, uses the geocode_type @@ -671,8 +671,10 @@ def aggregate_by_weighted_sum( to a from_geo, e.g. "wastewater collection site"). to_geo: str The column name of the geocode to aggregate to. - sensor: str + sensor_col: str The column name of the sensor to aggregate. + time_col: str + The column name of the timestamp to aggregate over. population_column: str The column name of the population to weight the sensor by. diff --git a/_delphi_utils_python/delphi_utils/logger.py b/_delphi_utils_python/delphi_utils/logger.py index c0e4502a8..30fd78059 100644 --- a/_delphi_utils_python/delphi_utils/logger.py +++ b/_delphi_utils_python/delphi_utils/logger.py @@ -1,11 +1,11 @@ """Structured logger utility for creating JSON logs. -See the delphi_utils README.md for usage examples. +To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic, +the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier), +and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument +to the logger call (for use in filtering, thresholding, grouping, visualization, etc) -The Delphi group uses two ~identical versions of this file. -Try to keep them in sync with edits, for sanity. - https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py - https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py +See the delphi_utils README.md for usage examples. """ import contextlib diff --git a/_delphi_utils_python/delphi_utils/runner.py b/_delphi_utils_python/delphi_utils/runner.py index abc28ba19..9083371aa 100644 --- a/_delphi_utils_python/delphi_utils/runner.py +++ b/_delphi_utils_python/delphi_utils/runner.py @@ -51,6 +51,7 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None], #Get version and indicator name for startup ind_name = indicator_fn.__module__.replace(".run", "") + #Check for version.cfg in indicator directory if os.path.exists("version.cfg"): with open("version.cfg") as ver_file: @@ -59,9 +60,15 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None], if "current_version" in line: current_version = str.strip(line) current_version = current_version.replace("current_version = ", "") - #Logging - Starting Indicator - logger.info(f"Started {ind_name} with covidcast-indicators version {current_version}") - else: logger.info(f"Started {ind_name} without version.cfg") + logger.info( + "Started a covidcast-indicator", + indicator_name=ind_name, + current_version=current_version, + ) + else: + logger.info( + "Started a covidcast-indicator without version.cfg", indicator_name=ind_name + ) indicator_fn(params) validator = validator_fn(params) @@ -77,8 +84,10 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None], break time.sleep(1) else: - logger.error(f"Flash step timed out ({timer} s), terminating", - elapsed_time_in_seconds = round(time.time() - start, 2)) + logger.error( + "Flash step timed out, terminating", + elapsed_time_in_seconds=round(time.time() - start, 2), + ) t1.terminate() t1.join() if validator: diff --git a/_delphi_utils_python/pyproject.toml b/_delphi_utils_python/pyproject.toml new file mode 100644 index 000000000..c47590a29 --- /dev/null +++ b/_delphi_utils_python/pyproject.toml @@ -0,0 +1,57 @@ +[build-system] +requires = ["setuptools", "setuptools-scm>=8.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "delphi-utils" +version = "0.3.25" +description = "Shared Utility Functions for Indicators" +readme = "README.md" +requires-python = "== 3.8.*" +license = { text = "MIT License" } +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3.8", + "License :: OSI Approved :: MIT License", +] +dependencies = [ + "boto3", + "covidcast", + "cvxpy", + "epiweeks", + "gitpython", + "importlib_resources>=1.3", + "numpy", + "pandas>=1.1.0", + "requests", + "slackclient", + "scs<3.2.6", # TODO: remove this ; it is a cvxpy dependency, and the excluded version appears to break our jenkins build. see: https://github.com/cvxgrp/scs/issues/283 + "structlog", + "xlrd", # needed by Pandas to read Excel files +] + +[project.urls] +Homepage = "https://github.com/cmu-delphi/covidcast-indicators" + +[project.optional-dependencies] +dev = [ + "darker[isort]~=2.1.1", + "pylint==2.8.3", + "pytest", + "pydocstyle", + "pytest-cov", + "mock", + "moto~=4.2.14", + "requests-mock", + "freezegun", +] +flash = ["scipy"] + +[tool.setuptools.packages.find] +where = ["."] +include = ["delphi_utils"] +namespaces = true + +[tool.setuptools.package-data] +"delphi_utils.data" = ["20*/*.csv"] diff --git a/_delphi_utils_python/setup.py b/_delphi_utils_python/setup.py deleted file mode 100644 index 3dee89b53..000000000 --- a/_delphi_utils_python/setup.py +++ /dev/null @@ -1,48 +0,0 @@ -from setuptools import setup -from setuptools import find_packages - -with open("README.md", "r") as f: - long_description = f.read() - -required = [ - "boto3", - "covidcast", - "cvxpy", - "scs<3.2.6", # TODO: remove this ; it is a cvxpy dependency, and the excluded version appears to break our jenkins build. see: https://github.com/cvxgrp/scs/issues/283 - "darker[isort]~=2.1.1", - "epiweeks", - "freezegun", - "gitpython", - "importlib_resources>=1.3", - "mock", - "moto~=4.2.14", - "numpy", - "pandas>=1.1.0", - "pydocstyle", - "pylint==2.8.3", - "pytest-cov", - "pytest", - "requests-mock", - "slackclient", - "structlog", - "xlrd" -] - -setup( - name="delphi_utils", - version="0.3.24", - description="Shared Utility Functions for Indicators", - long_description=long_description, - long_description_content_type="text/markdown", - author="", - author_email="", - url="https://github.com/cmu-delphi/", - install_requires=required, - classifiers=[ - "Development Status :: 5 - Production/Stable", - "Intended Audience :: Developers", - "Programming Language :: Python :: 3.8", - ], - packages=find_packages(), - package_data={'': ['data/20*/*.csv']} -) diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index 3e72f1d7f..c9c1f8483 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -323,12 +323,13 @@ def test_export_df_with_missingness(self, tmp_path): @mock.patch("delphi_utils.logger") def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path): - + sensor = "test" + geo_res = "state" create_export_csv( df=self.DF3.copy(), export_dir=tmp_path, - geo_res="state", - sensor="test", + sensor=sensor, + geo_res=geo_res, logger=mock_logger ) assert set(listdir(tmp_path)) == set( @@ -339,8 +340,9 @@ def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path): ] ) assert pd.read_csv(join(tmp_path, "20200315_state_test.csv")).size > 0 + date_str = datetime.strftime(self.TIMES[0], "%Y-%m-%d") mock_logger.info.assert_called_once_with( - "Filtering contradictory missing code in test_None_2020-02-15." + "Filtering contradictory missing code", sensor=sensor, metric=None, date=date_str ) def test_export_sort(self, tmp_path): diff --git a/_template_python/INDICATOR_DEV_GUIDE.md b/_template_python/INDICATOR_DEV_GUIDE.md index 2b8b6333c..f58a1ada9 100644 --- a/_template_python/INDICATOR_DEV_GUIDE.md +++ b/_template_python/INDICATOR_DEV_GUIDE.md @@ -50,6 +50,7 @@ This is the general extract-transform-load procedure used by all COVIDcast indic 7. Deliver the CSV output files to the `receiving/` directory on the API server. Adding a new indicator typically means implementing steps 1-3. Step 4 is included via the function ` create_export_csv`. Steps 5 (the validator), 6 (the archive differ) and 7 (acquisition) are all handled by runners in production. + ## Step 0: Keep revision history (important!) If the data provider doesn’t provide or it is unclear if they provide historical versions of the data, immediately set up a script (bash, Python, etc) to automatically (e.g. cron) download the data every day and save locally with versioning. @@ -195,7 +196,14 @@ Generally, indicators have: Do other geo handling (e.g. finding and reporting DC as a state). * `constants.py`: Lists of geos to produce, signals to produce, dataset ids, data source URL, etc. -Your code should be _extensively_ commented! Especially note sections where you took an unusual approach (make sure to say why and consider briefly discussing alternate approaches). +Your code should be _extensively_ commented! Especially note sections where you took an unusual approach (make sure to say why and consider briefly discussing alternate approaches that were discarded or could be useful in the future). + +#### Development environment + +Make sure you have a functional environment with python 3.8.15+. +For local runs, the makefile’s make install target will set up a local virtual environment with necessary packages. + +(If working in R (very much NOT recommended), local runs can be run without a virtual environment or using the [`renv` package](https://rstudio.github.io/renv/articles/renv.html), but production runs should be set up to use Docker.) #### Function stubs @@ -235,12 +243,22 @@ def api_call(token: str): After that, generalize your code to be able to be run on all geos of interest, take settings from params.json, use constants for easy maintenance, with extensive documentation, etc. -#### Development environment +#### Testing -Make sure you have a functional environment with python 3.8.15+. -For local runs, the makefile’s make install target will set up a local virtual environment with necessary packages. +As a general rule, it helps to decompose your functions into operations for which you can write unit tests. +To run the tests, use `make test` in the top-level indicator directory. -(If working in R (very much NOT recommended), local runs can be run without a virtual environment or using the [`renv` package](https://rstudio.github.io/renv/articles/renv.html), but production runs should be set up to use Docker.) +Unit tests are required for all functions. +Integration tests are highly desired, but may be difficult to set up depending on where the data is being fetched from. +Mocking functions are useful in this case. + +#### Dealing with dates + +We keep track of two different date fields for each dataset. The first field is called "reference value" (field name `time_value`) and tracks the date that a value is reported _for_, that is, when the event happened. The second field is called "issue date" or "version" (field name `issue`) and tracks when a value was recorded, not when it happened. + +For example, flu test positivity of 80% for a reference date of Jan 1 and an issue date of Jan 5 means that _on_ Jan 1, the test positivity rate was 80%. But we only received and recorded the value on Jan 5, 4 days later (AKA a lag of 4 days). + +It's important to track issue date because many data sources are revised over time, and reported values can change substantially between issues. #### Dealing with data-types @@ -255,14 +273,16 @@ E.g. which geo values are allowed, should every valid date be present in some wa In an ideal case, the data exists at one of our [already covered geos](https://cmu-delphi.github.io/delphi-epidata/api/covidcast_geography.html): -* State: state_code (string, leftpadded to 2 digits with 0) or state_id (string) +* Zip code * FIPS (state+county codes, string leftpadded to 5 digits with 0) -* ZIP * MSA (metro statistical area, int) * HRR (hospital referral region, int) +* State: state_code (string, leftpadded to 2 digits with 0) or state_id (string) +* HHS ([Department of Health and Human Services-defined regions](https://www.hhs.gov/about/agencies/iea/regional-offices/index.html)) +* Nation -If you want to map from one of these to another, the [`delphi_utils.geomapper`](https://github.com/cmu-delphi/covidcast-indicators/blob/6912077acba97e835aff7d0cd3d64309a1a9241d/_delphi_utils_python/delphi_utils/geomap.py) utility covers most cases. -A brief example of aggregating from states to hhs regions via their population: +If you want to map from one of these to a higher level, the [`delphi_utils.geomapper`](https://github.com/cmu-delphi/covidcast-indicators/blob/6912077acba97e835aff7d0cd3d64309a1a9241d/_delphi_utils_python/delphi_utils/geomap.py) utility covers most cases. +Here's a brief example of aggregating from states to hhs regions via their population: ```{python} from delphi_utils.geomap import GeoMapper @@ -274,19 +294,6 @@ hhs_version = geo_mapper.replace_geocode(df, "state_code","hhs", new_col = "geo_ This example is taken from [`hhs_hosp`](https://github.com/cmu-delphi/covidcast-indicators/blob/main/hhs_hosp/delphi_hhs/run.py); more documentation can be found in the `geomapper` class definition. -#### Implement a Missing Value code system - -The column is described [here](https://cmu-delphi.github.io/delphi-epidata/api/missing_codes.html). - -#### Local testing - -As a general rule, it helps to decompose your functions into operations for which you can write unit tests. -To run the tests, use `make test` in the top-level indicator directory. - -Unit tests are required for all functions. -Integration tests are highly desired, but may be difficult to set up depending on where the data is being fetched from. -Mocking functions are useful in this case. - #### Naming Indicator and signal names need to be approved by [@RoniRos](https://www.github.com/RoniRos). @@ -324,6 +331,52 @@ Using this tag dictionary, we can interpret the following signals as * `confirmed_admissions_influenza_1d_prop` = raw (unsmoothed) daily ("1d") confirmed influenza hospital admissions ("confirmed_admissions_influenza") per 100,000 population ("prop"). * `confirmed_admissions_influenza_1d_prop_7dav` = the same as above, but smoothed with a 7-day moving average ("7dav"). +#### Implement a Missing Value code system + +The column is described [here](https://cmu-delphi.github.io/delphi-epidata/api/missing_codes.html). + +#### Implement a patching method + +After normal data reporting is restored following an outage, we would like to be able to easily reconstruct the version history of the data. To do so, implement a `patch` method that runs an indicator's main `run_module` for every issue date in a range. An [example patch module](https://github.com/cmu-delphi/covidcast-indicators/blob/b784f30/google_symptoms/delphi_google_symptoms/patch.py). + +An outage can be external to Delphi, e.g. the data provider was unable to provide new data on the historically-expected schedule, or internal, e.g. Delphi code had a bug that caused our pipeline to fail. The goal of the patch feature is to recreate every missing issue _as if we had ingested it on the correct day_. + +The patch feature should be easy to use. The only manual parts should be modifying `params.json`, and running the patch module and acquisition. Any setup that needs to be done (e.g. cache creation, dir creation) should be done automatically as part of the patch function. + +All patch modules should expect settings from `params.json` of the form + +``` +{ + "common": { + ... + "custom_run": true + }, + "validation": { + ... + }, + "patch": { + "patch_dir": "/", + "start_issue": "2024-04-20", + "end_issue": "2024-04-21" + } +} +``` + +The `custom_run` parameter should [default to false](https://github.com/cmu-delphi/covidcast-indicators/blob/d435bf0f0d5880ddf8905ea60f242976e6702342/nssp/delphi_nssp/run.py#L73), and [warn](https://github.com/cmu-delphi/covidcast-indicators/blob/d435bf0f0d5880ddf8905ea60f242976e6702342/nssp/delphi_nssp/run.py#L75-L83) if parameters and arguments disagree. + +Patching should generate data for that range of issue dates, and store them in batch issue format: +`/issue_//xxx.csv`. + +Acquisition in `delphi-epidata` includes [code that allow files in this issue-specific structure](https://github.com/cmu-delphi/delphi-epidata/blob/694d89ad763fa85bd644e1f64552c9bc85f688ef/src/acquisition/covidcast/csv_to_database.py#L43C32-L43C61) to be added to the database. This output format is designed to match the `issue`-type acquisition format. The issue-specific mode is triggered with the flag `specific_issue_date`. [A Cronicle job](https://cronicle-prod-01.delphi.cmu.edu/#Schedule?sub=edit_event&id=elh59ynwobf) has already been set up to call acquisition using the flag; please use it to load patches into the database. + +Sometimes source data is already versioned, and to reconstruct an issue we simply need to filter the source data to include only values that would have been available on that issue day. If we receive data drops directly, we can filter by the file creation date instead. + +However, it is not always possible to reconstruct issues; many datasets aren't versioned by the provider. If a source has no revisions (for example, `google-symptoms`), then we can guess which dates of data would have been available that issue day based on the normal lag of the source. For example, `google-symptoms` normally has a lag of 4 days, i.e. "today" the most recent data we see in the source data is from 4 days ago. So to reconstruct data for issue 2024-01-10, we just need to report data with a `time_value` (reference date) from 2024-01-06 and earlier. (How much earlier depends on the behavior we normally expect from the indicator code; if we normally report 2 weeks of data, filter to 2024-01-06 - 14 days through 2024-01-06.) + +Some datasets, such as those on healthdata.gov, provide metadata indicating when certain rows were updated. + +In other cases (such as datasetes that both have revisions _and_ don't track revisions), please discuss with the indicator stakeholder and consider [what you know about how the data works](#step-1-exploratory-analysis). + ### Statistical review The data produced by the new indicator needs to be sanity-checked. @@ -419,14 +472,17 @@ Refer to [this guide](https://docs.google.com/document/d/1Bbuvtoxowt7x2_8USx_JY- * Add module name to the `build` job in `.github/workflows/python-ci.yml`. This allows github actions to run on this indicator code, which includes unit tests and linting. -* Add top-level directory name to `indicator_list` in `Jenkinsfile`. +* Add module name to the ["Copy version to indicator directory" step](https://github.com/cmu-delphi/covidcast-indicators/blob/f01185767a9847d8082baf4f1e17be50a39047c2/.github/workflows/create-release.yml#L64) in `.github/workflows/create-release.yml`. +* Add top-level directory name to [`indicator_list` in `Jenkinsfile`](https://github.com/cmu-delphi/covidcast-indicators/blob/f01185767a9847d8082baf4f1e17be50a39047c2/Jenkinsfile#L13). This allows your code to be automatically deployed to staging after your branch is merged to main, and deployed to prod after `covidcast-indicators` is released. * Create `ansible/templates/{top_level_directory_name}-params-prod.json.j2` based on your `params.json.template` with some adjustment: * "export_dir": "/common/covidcast/receiving/{data-source-name}" * "log_filename": "/var/log/indicators/{top_level_directory_name}.log" +* Define any sensitive variables as "secrets" in the [Ansible `vars.yaml`](https://github.com/cmu-delphi/covidcast-indicators/blob/main/ansible/vars.yaml) and [vault](https://github.com/cmu-delphi/covidcast-indicators/blob/main/ansible/vault.yaml). +Refer to [this guide](https://docs.google.com/document/d/1Bbuvtoxowt7x2_8USx_JY-yTo-Av3oAFlhyG-vXGG-c/edit#heading=h.8kkoy8sx3t7f) for more vault info. +* Add configs for Sir Complains-a-Lot ("sirCAL") alerting in sirCAL's [local](https://github.com/cmu-delphi/covidcast-indicators/blob/main/sir_complainsalot/params.json.template) and [Ansible](https://github.com/cmu-delphi/covidcast-indicators/blob/main/ansible/templates/sir_complainsalot-params-prod.json.j2) params templates. Pay attention to the receiving/export directory, as well as how you can store credentials in vault. -Refer to [this guide](https://docs.google.com/document/d/1Bbuvtoxowt7x2_8USx_JY-yTo-Av3oAFlhyG-vXGG-c/edit#heading=h.8kkoy8sx3t7f) for more vault info. ### Staging diff --git a/_template_python/pyproject.toml b/_template_python/pyproject.toml new file mode 100644 index 000000000..2f84aaa00 --- /dev/null +++ b/_template_python/pyproject.toml @@ -0,0 +1,42 @@ +[build-system] +requires = ["setuptools", "setuptools-scm>=8.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "delphi_NAME" +version = "0.1.0" +description = "SHORT DESCRIPTION" +readme = "README.md" +requires-python = "== 3.8.*" +license = { text = "MIT License" } +classifiers = [ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3.8", + "License :: OSI Approved :: MIT License", +] +dependencies = [ + "delphi-utils", + "numpy", + "pandas>=1.1.0", +] + +[project.urls] +Homepage = "https://github.com/cmu-delphi/covidcast-indicators" + +[project.optional-dependencies] +dev = [ + "darker[isort]~=2.1.1", + "pylint==2.8.3", + "pytest", + "pydocstyle", + "pytest-cov", + "mock", + "moto~=4.2.14", + "requests-mock", + "freezegun", +] + +[tool.setuptools.packages.find] +where = ["."] +namespaces = true diff --git a/_template_python/setup.py b/_template_python/setup.py deleted file mode 100644 index d7bc44078..000000000 --- a/_template_python/setup.py +++ /dev/null @@ -1,30 +0,0 @@ -from setuptools import setup -from setuptools import find_packages - -required = [ - "covidcast", - "darker[isort]~=2.1.1", - "delphi-utils", - "numpy", - "pandas", - "pydocstyle", - "pylint==2.8.3", - "pytest-cov", - "pytest", -] - -setup( - name="delphi_NAME", - version="0.1.0", - description="SHORT DESCRIPTION", - author="", - author_email="", - url="https://github.com/cmu-delphi/covidcast-indicators", - install_requires=required, - classifiers=[ - "Development Status :: 5 - Production/Stable", - "Intended Audience :: Developers", - "Programming Language :: Python :: 3.8", - ], - packages=find_packages(), -) diff --git a/ansible/templates/nchs_mortality-params-prod.json.j2 b/ansible/templates/nchs_mortality-params-prod.json.j2 index dbd39598b..4b0d0c4f7 100644 --- a/ansible/templates/nchs_mortality-params-prod.json.j2 +++ b/ansible/templates/nchs_mortality-params-prod.json.j2 @@ -1,6 +1,7 @@ { "common": { "daily_export_dir": "./daily_receiving", + "backup_dir": "./raw_data_backups", "log_filename": "/var/log/indicators/nchs_mortality.log", "weekly_export_dir": "/common/covidcast/receiving/nchs-mortality" }, diff --git a/ansible/vault.yaml b/ansible/vault.yaml index bf979841a..1a674592b 100644 --- a/ansible/vault.yaml +++ b/ansible/vault.yaml @@ -1,265 +1,265 @@ $ANSIBLE_VAULT;1.1;AES256 -63313737356234663364633839353638393064663937643934333430643832363065646138333038 -3133663766613235613137383736383030636236323166370a366235353833306336336432376333 -61336662656336336530326463663761366232393063326365366561386636636165396430623165 -6131643963386231650aa356266613334313536396464653165 +65653530383462356536656235396339663937643835313232313132663832373536376330336230 +6135303831393331620adiff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index 92a03e6c5..9c15b221d 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -25,7 +25,7 @@ def retrieve_files(params, filedate, logger): if files["denom"] is None: ## download recent files from FTP server - logger.info("downloading recent files through SFTP") + logger.info("Downloading recent files through SFTP") download_counts(filedate, params["indicator"]["input_cache_dir"], params["indicator"]["ftp_conn"]) denom_file = "%s/%s_Counts_Products_Denom.dat.gz" % (params["indicator"]["input_cache_dir"],filedate) @@ -157,18 +157,20 @@ def run_module(params: Dict[str, Dict[str, Any]]): startdate, enddate = process_dates(params, startdate_dt, enddate_dt) - logger.info("generating signal and exporting to CSV", - first_sensor_date = startdate, - last_sensor_date = enddate, - drop_date = dropdate, - n_backfill_days = n_backfill_days, - n_waiting_days = n_waiting_days, - geos = params["indicator"]["geos"], - export_dir = params["common"]["export_dir"], - parallel = params["indicator"]["parallel"], - weekday = params["indicator"]["weekday"], - types = params["indicator"]["types"], - se = params["indicator"]["se"]) + logger.info( + "Generating signal and exporting to CSV", + first_sensor_date=startdate, + last_sensor_date=enddate, + drop_date=dropdate, + n_backfill_days=n_backfill_days, + n_waiting_days=n_waiting_days, + geos=params["indicator"]["geos"], + export_dir=params["common"]["export_dir"], + parallel=params["indicator"]["parallel"], + weekday=params["indicator"]["weekday"], + types=params["indicator"]["types"], + se=params["indicator"]["se"], + ) ## start generating stats = [] @@ -176,9 +178,9 @@ def run_module(params: Dict[str, Dict[str, Any]]): for numtype in params["indicator"]["types"]: for weekday in params["indicator"]["weekday"]: if weekday: - logger.info("starting weekday adj", geo = geo, numtype = numtype) + logger.info("Starting weekday adj", geo_type=geo, numtype=numtype) else: - logger.info("starting no adj", geo = geo, numtype = numtype) + logger.info("Starting no adj", geo_type=geo, numtype=numtype) su_inst = CHCSensorUpdater( startdate, enddate, @@ -211,7 +213,7 @@ def run_module(params: Dict[str, Dict[str, Any]]): ) stats.extend(more_stats) - logger.info("finished processing", geo = geo) + logger.info("Finished processing", geo_type=geo) elapsed_time_in_seconds = round(time.time() - start_time, 2) min_max_date = stats and min(s[0] for s in stats) diff --git a/changehc/delphi_changehc/sensor.py b/changehc/delphi_changehc/sensor.py index 0449f07df..9a1fd29e0 100644 --- a/changehc/delphi_changehc/sensor.py +++ b/changehc/delphi_changehc/sensor.py @@ -118,10 +118,10 @@ def fit(y_data, first_sensor_date, geo_id, logger, num_col="num", den_col="den") se_valid = valid_rates.eval('sqrt(rate * (1 - rate) / den)') rate_data['se'] = se_valid - logger.debug("{0}: {1:.3f},[{2:.3f}]".format( - geo_id, rate_data['rate'][-1], rate_data['se'][-1] - )) - return {"geo_id": geo_id, - "rate": 100 * rate_data['rate'], - "se": 100 * rate_data['se'], - "incl": include} + logger.debug( + ".fit() DEBUG - last rate/se for geo", + geo_value=geo_id, + value=rate_data["rate"][-1], + se=rate_data["se"][-1], + ) + return {"geo_id": geo_id, "rate": 100 * rate_data["rate"], "se": 100 * rate_data["se"], "incl": include} diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index edae85517..7c78dc020 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -41,7 +41,7 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa assert df[suspicious_se_mask].empty, " se contains suspiciously large values" assert not df["se"].isna().any(), " se contains nan values" if write_se: - logger.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name)) + logger.info("WARNING: WRITING SEs", filename=out_name) else: df["se"] = np.nan @@ -49,9 +49,7 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa suspicious_val_mask = df["val"].gt(90) if not df[suspicious_val_mask].empty: for geo in df.loc[suspicious_val_mask, "geo_id"]: - logger.warning("value suspiciously high, {0}: {1}".format( - geo, out_name - )) + logger.warning("Value suspiciously high", geo_value=geo, filename=out_name) dates = create_export_csv( df, @@ -62,10 +60,8 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa sensor=out_name, write_empty_days=True ) - logger.debug("wrote {0} rows for {1} {2}".format( - df.size, df["geo_id"].unique().size, geo_level - )) - logger.debug("wrote files to {0}".format(output_path)) + logger.debug("Wrote rows", num_rows=df.size, geo_type=geo_level, num_geo_ids=df["geo_id"].unique().size) + logger.debug("Wrote files", export_dir=output_path) return dates @@ -148,8 +144,9 @@ def geo_reindex(self, data): geo = self.geo gmpr = GeoMapper() if geo not in {"county", "state", "msa", "hrr", "nation", "hhs"}: - self.logger.error("{0} is invalid, pick one of 'county', " - "'state', 'msa', 'hrr', 'hss','nation'".format(geo)) + self.logger.error( + "Geo is invalid, pick one of 'county', " "'state', 'msa', 'hrr', 'hss','nation'", geo_type=geo + ) return False if geo == "county": data_frame = gmpr.fips_to_megacounty(data, @@ -224,7 +221,7 @@ def update_sensor(self, dfs.append(res) else: n_cpu = min(10, cpu_count()) - self.logger.debug("starting pool with {0} workers".format(n_cpu)) + self.logger.debug("Starting pool", n_workers=n_cpu) with Pool(n_cpu) as pool: pool_results = [] for geo_id, sub_data in data_frame.groupby(level=0,as_index=False): diff --git a/changehc/setup.py b/changehc/setup.py index d95beb771..f386e9613 100644 --- a/changehc/setup.py +++ b/changehc/setup.py @@ -6,6 +6,7 @@ "covidcast", "darker[isort]~=2.1.1", "delphi-utils", + "mock", "moto~=4.2.14", "numpy", "pandas", diff --git a/changehc/tests/test_update_sensor.py b/changehc/tests/test_update_sensor.py index 7ef25a608..d2e7ee2f3 100644 --- a/changehc/tests/test_update_sensor.py +++ b/changehc/tests/test_update_sensor.py @@ -9,6 +9,7 @@ import pandas as pd import numpy as np from boto3 import Session +from delphi_utils import get_structured_logger from moto import mock_s3 import pytest @@ -28,7 +29,7 @@ DENOM_FILEPATH = PARAMS["indicator"]["input_denom_file"] DROP_DATE = pd.to_datetime(PARAMS["indicator"]["drop_date"]) OUTPATH="test_data/" -TEST_LOGGER = logging.getLogger() +TEST_LOGGER = get_structured_logger() class TestCHCSensorUpdater: """Tests for updating the sensors.""" diff --git a/changehc/version.cfg b/changehc/version.cfg index f5c28d2cd..ed4b085f1 100644 --- a/changehc/version.cfg +++ b/changehc/version.cfg @@ -1 +1 @@ -current_version = 0.3.55 +current_version = 0.3.56 diff --git a/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py b/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py index 5c9019035..20fd0d953 100644 --- a/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py +++ b/claims_hosp/delphi_claims_hosp/download_claims_ftp_files.py @@ -58,7 +58,7 @@ def download(ftp_credentials, out_path, logger, issue_date=None): current_time = issue_date if issue_date else datetime.datetime.now() seconds_in_day = 24 * 60 * 60 - logger.info("starting download", time=current_time) + logger.info("Starting download") # open client client = paramiko.SSHClient() diff --git a/claims_hosp/delphi_claims_hosp/indicator.py b/claims_hosp/delphi_claims_hosp/indicator.py index 4ad3ef7df..c5ac4e886 100644 --- a/claims_hosp/delphi_claims_hosp/indicator.py +++ b/claims_hosp/delphi_claims_hosp/indicator.py @@ -143,7 +143,10 @@ def fit(y_data, first_date, geo_id, num_col="num", den_col="den"): se_valid = valid_rates.eval('sqrt(rate * (1 - rate) / den)') rate_data['se'] = se_valid - logging.debug("%s: %05.3f, [%05.3f]", - geo_id, rate_data['rate'][-1], rate_data['se'][-1]) - return {"geo_id": geo_id, "rate": 100 * rate_data['rate'], - "se": 100 * rate_data['se'], "incl": include} + logging.debug( + ".fit() DEBUG - last rate/se for geo", + geo_value=geo_id, + value=rate_data["rate"][-1], + se=rate_data["se"][-1], + ) + return {"geo_id": geo_id, "rate": 100 * rate_data["rate"], "se": 100 * rate_data["se"], "incl": include} diff --git a/claims_hosp/delphi_claims_hosp/modify_claims_drops.py b/claims_hosp/delphi_claims_hosp/modify_claims_drops.py index 0ab93ebcc..19a962884 100644 --- a/claims_hosp/delphi_claims_hosp/modify_claims_drops.py +++ b/claims_hosp/delphi_claims_hosp/modify_claims_drops.py @@ -57,5 +57,5 @@ def modify_and_write(data_path, logger, test_mode=False): dfs_list.append(dfs) else: dfs.to_csv(out_path, index=False) - logger.info(f"Wrote {out_path}") + logger.info("Wrote modified csv", filename=out_path) return files, dfs_list diff --git a/claims_hosp/delphi_claims_hosp/run.py b/claims_hosp/delphi_claims_hosp/run.py index 0d24192c0..3a8538bb2 100644 --- a/claims_hosp/delphi_claims_hosp/run.py +++ b/claims_hosp/delphi_claims_hosp/run.py @@ -130,9 +130,9 @@ def run_module(params, logger=None): for geo in params["indicator"]["geos"]: for weekday in params["indicator"]["weekday"]: if weekday: - logger.info("starting weekday adj", geo = geo) + logger.info("Starting weekday adj", geo_type=geo) else: - logger.info("starting no weekday adj", geo = geo) + logger.info("Starting no weekday adj", geo_type=geo) signal_name = Config.signal_weekday_name if weekday else Config.signal_name if params["indicator"]["write_se"]: @@ -140,7 +140,7 @@ def run_module(params, logger=None): "supply obfuscated prefix in params.json" signal_name = params["indicator"]["obfuscated_prefix"] + "_" + signal_name - logger.info("Updating signal name", signal_name = signal_name) + logger.info("Updating signal name", signal=signal_name) updater = ClaimsHospIndicatorUpdater( startdate, enddate, @@ -149,16 +149,16 @@ def run_module(params, logger=None): params["indicator"]["parallel"], weekday, params["indicator"]["write_se"], - signal_name + signal_name, + logger, ) updater.update_indicator( claims_file, params["common"]["export_dir"], - logger, ) max_dates.append(updater.output_dates[-1]) n_csv_export.append(len(updater.output_dates)) - logger.info("finished updating", geo = geo) + logger.info("Finished updating", geo_type=geo) # Remove all the raw files for fn in os.listdir(params["indicator"]["input_dir"]): diff --git a/claims_hosp/delphi_claims_hosp/update_indicator.py b/claims_hosp/delphi_claims_hosp/update_indicator.py index df3f3308f..5ba8ddd22 100644 --- a/claims_hosp/delphi_claims_hosp/update_indicator.py +++ b/claims_hosp/delphi_claims_hosp/update_indicator.py @@ -7,7 +7,6 @@ """ # standard packages -import logging from multiprocessing import Pool, cpu_count # third party @@ -28,8 +27,7 @@ class ClaimsHospIndicatorUpdater: # pylint: disable=too-many-instance-attributes, too-many-arguments # all variables are used - def __init__(self, startdate, enddate, dropdate, geo, parallel, weekday, - write_se, signal_name): + def __init__(self, startdate, enddate, dropdate, geo, parallel, weekday, write_se, signal_name, logger): """ Initialize updater for the claims-based hospitalization indicator. @@ -53,6 +51,7 @@ def __init__(self, startdate, enddate, dropdate, geo, parallel, weekday, # init in shift_dates, declared here for pylint self.burnindate, self.fit_dates, self.burn_in_dates, self.output_dates = \ [None] * 4 + self.logger = logger assert ( self.startdate > (Config.FIRST_DATA_DATE + Config.BURN_IN_PERIOD) @@ -114,9 +113,9 @@ def geo_reindex(self, data): elif self.geo == "hrr": data_frame = data # data is already adjusted in aggregation step above else: - logging.error( - "%s is invalid, pick one of 'county', 'state', 'msa', 'hrr', 'hhs', nation'", - self.geo) + self.logger.error( + "Geo is invalid, pick one of 'county', 'state', 'msa', 'hrr', 'hhs', nation'", geo_type=self.geo + ) return False unique_geo_ids = pd.unique(data_frame[self.geo]) @@ -133,7 +132,7 @@ def geo_reindex(self, data): data_frame.fillna(0, inplace=True) return data_frame - def update_indicator(self, input_filepath, outpath, logger): + def update_indicator(self, input_filepath, outpath): """ Generate and output indicator values. @@ -159,7 +158,7 @@ def update_indicator(self, input_filepath, outpath, logger): ["num"], Config.DATE_COL, [1, 1e5], - logger, + self.logger, ) if self.weekday else None @@ -182,7 +181,7 @@ def update_indicator(self, input_filepath, outpath, logger): valid_inds[geo_id] = np.array(res.loc[final_output_inds, "incl"]) else: n_cpu = min(Config.MAX_CPU_POOL, cpu_count()) - logging.debug("starting pool with %d workers", n_cpu) + self.logger.debug("Starting pool", n_workers=n_cpu) with Pool(n_cpu) as pool: pool_results = [] for geo_id, sub_data in data_frame.groupby(level=0, as_index=False): @@ -217,7 +216,7 @@ def update_indicator(self, input_filepath, outpath, logger): } self.write_to_csv(output_dict, outpath) - logging.debug("wrote files to %s", outpath) + self.logger.debug("Wrote files", export_dir=outpath) def write_to_csv(self, output_dict, output_path="./receiving"): """ @@ -229,8 +228,7 @@ def write_to_csv(self, output_dict, output_path="./receiving"): """ if self.write_se: - logging.info("========= WARNING: WRITING SEs TO %s =========", - self.signal_name) + self.logger.info("WARNING: WRITING SEs", signal=self.signal_name) geo_level = output_dict["geo_level"] dates = output_dict["dates"] @@ -255,7 +253,7 @@ def write_to_csv(self, output_dict, output_path="./receiving"): assert not np.isnan(val), "value for included value is nan" assert not np.isnan(se), "se for included rate is nan" if val > 90: - logging.warning("value suspicious, %s: %d", geo_id, val) + self.logger.warning("Value suspicious", geo_type=geo_level, geo_value=geo_id, value=val) assert se < 5, f"se suspicious, {geo_id}: {se}" if self.write_se: assert val > 0 and se > 0, "p=0, std_err=0 invalid" @@ -267,4 +265,4 @@ def write_to_csv(self, output_dict, output_path="./receiving"): "%s,%f,%s,%s,%s\n" % (geo_id, val, "NA", "NA", "NA")) out_n += 1 - logging.debug("wrote %d rows for %d %s", out_n, len(geo_ids), geo_level) + self.logger.debug("Wrote rows", num_rows=out_n, geo_type=geo_level, num_geo_ids=len(geo_ids)) diff --git a/claims_hosp/tests/test_update_indicator.py b/claims_hosp/tests/test_update_indicator.py index 1471be655..5ca527287 100644 --- a/claims_hosp/tests/test_update_indicator.py +++ b/claims_hosp/tests/test_update_indicator.py @@ -51,7 +51,8 @@ def test_shift_dates(self): self.parallel, self.weekday, self.write_se, - Config.signal_name + Config.signal_name, + TEST_LOGGER ) ## Test init assert updater.startdate.month == 2 @@ -72,7 +73,8 @@ def test_geo_reindex(self): self.parallel, self.weekday, self.write_se, - Config.signal_name + Config.signal_name, + TEST_LOGGER ) updater.shift_dates() data_frame = updater.geo_reindex(self.small_test_data.reset_index()) @@ -90,13 +92,13 @@ def test_update_indicator(self): self.parallel, self.weekday, self.write_se, - Config.signal_name + Config.signal_name, + TEST_LOGGER ) updater.update_indicator( DATA_FILEPATH, - td.name, - TEST_LOGGER + td.name ) assert len(os.listdir(td.name)) == len( @@ -112,7 +114,8 @@ def test_write_to_csv_results(self): self.parallel, self.weekday, self.write_se, - Config.signal_name + Config.signal_name, + TEST_LOGGER ) res0 = { @@ -192,7 +195,8 @@ def test_write_to_csv_with_se_results(self): self.parallel, True, True, - signal_name + signal_name, + TEST_LOGGER ) res0 = { @@ -243,7 +247,8 @@ def test_write_to_csv_wrong_results(self): self.parallel, self.weekday, self.write_se, - Config.signal_name + Config.signal_name, + TEST_LOGGER ) res0 = { diff --git a/claims_hosp/version.cfg b/claims_hosp/version.cfg index f5c28d2cd..ed4b085f1 100644 --- a/claims_hosp/version.cfg +++ b/claims_hosp/version.cfg @@ -1 +1 @@ -current_version = 0.3.55 +current_version = 0.3.56 diff --git a/doctor_visits/delphi_doctor_visits/download_claims_ftp_files.py b/doctor_visits/delphi_doctor_visits/download_claims_ftp_files.py index 9d51768be..6cb42364a 100644 --- a/doctor_visits/delphi_doctor_visits/download_claims_ftp_files.py +++ b/doctor_visits/delphi_doctor_visits/download_claims_ftp_files.py @@ -59,7 +59,7 @@ def download(ftp_credentials, out_path, logger, issue_date=None): else: current_time = datetime.datetime.strptime(issue_date, "%Y-%m-%d").replace(hour=23, minute=59, second=59) - logger.info("starting download", time=current_time) + logger.info("Starting download") seconds_in_day = 24 * 60 * 60 # open client diff --git a/doctor_visits/delphi_doctor_visits/modify_claims_drops.py b/doctor_visits/delphi_doctor_visits/modify_claims_drops.py index daed93d58..3be9393e4 100644 --- a/doctor_visits/delphi_doctor_visits/modify_claims_drops.py +++ b/doctor_visits/delphi_doctor_visits/modify_claims_drops.py @@ -48,5 +48,5 @@ def modify_and_write(f, logger, test_mode=False): if not test_mode: dfs.to_csv(out_path, index=False) - logger.info(f"Wrote {out_path}") + logger.info("Wrote modified csv", filename=out_path) return dfs diff --git a/doctor_visits/delphi_doctor_visits/patch.py b/doctor_visits/delphi_doctor_visits/patch.py index 32b6d308f..32c62dd6b 100644 --- a/doctor_visits/delphi_doctor_visits/patch.py +++ b/doctor_visits/delphi_doctor_visits/patch.py @@ -45,16 +45,19 @@ def patch(): start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") - logger.info(f"""Start patching {params["patch"]["patch_dir"]}""") - logger.info(f"""Start issue: {start_issue.strftime("%Y-%m-%d")}""") - logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""") + logger.info( + "Starting patching", + patch_directory=params["patch"]["patch_dir"], + start_issue=start_issue.strftime("%Y-%m-%d"), + end_issue=end_issue.strftime("%Y-%m-%d"), + ) makedirs(params["patch"]["patch_dir"], exist_ok=True) current_issue = start_issue while current_issue <= end_issue: - logger.info(f"""Running issue {current_issue.strftime("%Y-%m-%d")}""") + logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d")) params["patch"]["current_issue"] = current_issue.strftime("%Y-%m-%d") diff --git a/doctor_visits/delphi_doctor_visits/run.py b/doctor_visits/delphi_doctor_visits/run.py index 3c941534a..2dccffc8c 100644 --- a/doctor_visits/delphi_doctor_visits/run.py +++ b/doctor_visits/delphi_doctor_visits/run.py @@ -88,32 +88,33 @@ def run_module(params, logger=None): # pylint: disable=too-many-statements startdate_dt = enddate_dt - timedelta(days=n_backfill_days) enddate = str(enddate_dt.date()) startdate = str(startdate_dt.date()) - logger.info("drop date:\t\t%s", dropdate) - logger.info("first sensor date:\t%s", startdate) - logger.info("last sensor date:\t%s", enddate) - logger.info("n_backfill_days:\t%s", n_backfill_days) - logger.info("n_waiting_days:\t%s", n_waiting_days) + + logger.info( + "Using params", + startdate=startdate, + enddate=enddate, + dropdate=dropdate, + n_backfill_days=n_backfill_days, + n_waiting_days=n_waiting_days, + export_dir=export_dir, + parallel=params["indicator"]["parallel"], + weekday=params["indicator"]["weekday"], + write_se=se, + prefix=prefix, + ) ## geographies geos = ["state", "msa", "hrr", "county", "hhs", "nation"] - - ## print out other vars - logger.info("outpath:\t\t%s", export_dir) - logger.info("parallel:\t\t%s", params["indicator"]["parallel"]) - logger.info("weekday:\t\t%s", params["indicator"]["weekday"]) - logger.info("write se:\t\t%s", se) - logger.info("obfuscated prefix:\t%s", prefix) - max_dates = [] n_csv_export = [] ## start generating for geo in geos: for weekday in params["indicator"]["weekday"]: if weekday: - logger.info("starting %s, weekday adj", geo) + logger.info("Starting with weekday adj", geo_type=geo) else: - logger.info("starting %s, no adj", geo) + logger.info("Starting with no adj", geo_type=geo) sensor = update_sensor( filepath=claims_file, startdate=startdate, @@ -137,8 +138,8 @@ def run_module(params, logger=None): # pylint: disable=too-many-statements write_to_csv(sensor, geo, se, out_name, logger, export_dir) max_dates.append(sensor.date.max()) n_csv_export.append(sensor.date.unique().shape[0]) - logger.debug(f"wrote files to {export_dir}") - logger.info("finished updating", geo = geo) + logger.debug("Wrote files", export_dir=export_dir) + logger.info("Finished updating", geo_type=geo) # Remove all the raw files for fn in os.listdir(params["indicator"]["input_dir"]): diff --git a/doctor_visits/delphi_doctor_visits/sensor.py b/doctor_visits/delphi_doctor_visits/sensor.py index b5a645ea8..91faa20cf 100644 --- a/doctor_visits/delphi_doctor_visits/sensor.py +++ b/doctor_visits/delphi_doctor_visits/sensor.py @@ -239,7 +239,7 @@ def fit(y_data, se[include] = np.sqrt( np.divide((new_rates[include] * (1 - new_rates[include])), den[include])) - logger.debug(f"{geo_id}: {new_rates[-1]:.3f},[{se[-1]:.3f}]") + logger.debug(".fit() DEBUG - last rate/se for geo", geo_value=geo_id, value=new_rates[-1], se=se[-1]) included_indices = [x for x in final_sensor_idxs if include[x]] diff --git a/doctor_visits/delphi_doctor_visits/update_sensor.py b/doctor_visits/delphi_doctor_visits/update_sensor.py index 125c0df18..4cac1e81c 100644 --- a/doctor_visits/delphi_doctor_visits/update_sensor.py +++ b/doctor_visits/delphi_doctor_visits/update_sensor.py @@ -34,7 +34,7 @@ def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, logger, outpu output_path: outfile path to write the csv (default is current directory) """ if se: - logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========") + logger.info("WARNING: WRITING SEs", filename=out_name) out_n = 0 for d in set(output_df["date"]): @@ -64,7 +64,7 @@ def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, logger, outpu outfile.write( "%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA")) out_n += 1 - logger.debug(f"wrote {out_n} rows for {geo_level}") + logger.debug("Wrote rows", num_rows=out_n, geo_type=geo_level) def update_sensor( @@ -177,7 +177,7 @@ def update_sensor( else: n_cpu = min(10, cpu_count()) - logger.debug(f"starting pool with {n_cpu} workers") + logger.debug("Starting pool", n_workers=n_cpu) with Pool(n_cpu) as pool: pool_results = [] diff --git a/doctor_visits/version.cfg b/doctor_visits/version.cfg index f5c28d2cd..ed4b085f1 100644 --- a/doctor_visits/version.cfg +++ b/doctor_visits/version.cfg @@ -1 +1 @@ -current_version = 0.3.55 +current_version = 0.3.56 diff --git a/google_symptoms/delphi_google_symptoms/date_utils.py b/google_symptoms/delphi_google_symptoms/date_utils.py index ebfe5109a..2ad6244e9 100644 --- a/google_symptoms/delphi_google_symptoms/date_utils.py +++ b/google_symptoms/delphi_google_symptoms/date_utils.py @@ -98,7 +98,7 @@ def generate_num_export_days(params: Dict, logger) -> [int]: expected_date_diff += global_max_expected_lag if latest_date_diff > expected_date_diff: - logger.info(f"Missing dates from: {to_datetime(min(gs_metadata.max_time)).date()}") + logger.info("Missing date", date=to_datetime(min(gs_metadata.max_time)).date()) num_export_days = expected_date_diff diff --git a/google_symptoms/delphi_google_symptoms/patch.py b/google_symptoms/delphi_google_symptoms/patch.py index 01d099c4f..85df89394 100755 --- a/google_symptoms/delphi_google_symptoms/patch.py +++ b/google_symptoms/delphi_google_symptoms/patch.py @@ -58,16 +58,19 @@ def patch(params): issue_date = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") - logger.info(f"""Start patching {params["patch"]["patch_dir"]}""") - logger.info(f"""Start issue: {issue_date.strftime("%Y-%m-%d")}""") - logger.info(f"""End issue: {end_issue.strftime("%Y-%m-%d")}""") + logger.info( + "Starting patching", + patch_directory=params["patch"]["patch_dir"], + start_issue=issue_date.strftime("%Y-%m-%d"), + end_issue=end_issue.strftime("%Y-%m-%d"), + ) makedirs(params["patch"]["patch_dir"], exist_ok=True) patch_dates = generate_patch_dates(params) while issue_date <= end_issue: - logger.info(f"""Running issue {issue_date.strftime("%Y-%m-%d")}""") + logger.info("Running issue", issue_date=issue_date.strftime("%Y-%m-%d")) # Output dir setup current_issue_yyyymmdd = issue_date.strftime("%Y%m%d") diff --git a/google_symptoms/delphi_google_symptoms/run.py b/google_symptoms/delphi_google_symptoms/run.py index 8303a9a8a..8ad1d6d10 100644 --- a/google_symptoms/delphi_google_symptoms/run.py +++ b/google_symptoms/delphi_google_symptoms/run.py @@ -80,10 +80,8 @@ def run_module(params, logger=None): if len(df_pull) == 0: continue for metric, smoother in product(COMBINED_METRIC, SMOOTHERS): - logger.info("generating signal and exporting to CSV", - geo_res=geo_res, - metric=metric, - smoother=smoother) + sensor_name = "_".join([smoother, "search"]) + logger.info("Generating signal and exporting to CSV", geo_type=geo_res, signal=f"{metric}_{sensor_name}") df = df_pull df["val"] = df[metric].astype(float) df["val"] = df[["geo_id", "val"]].groupby( @@ -94,9 +92,8 @@ def run_module(params, logger=None): # Drop early entries where data insufficient for smoothing df = df.loc[~df["val"].isnull(), :] df = df.reset_index() - sensor_name = "_".join([smoother, "search"]) if len(df) == 0: - logger.info("No data for %s_%s_%s", geo_res, metric.lower(), sensor_name) + logger.info("No data for signal", geo_type=geo_res, signal=f"{metric}_{sensor_name}") continue exported_csv_dates = create_export_csv( df, diff --git a/google_symptoms/version.cfg b/google_symptoms/version.cfg index f5c28d2cd..ed4b085f1 100644 --- a/google_symptoms/version.cfg +++ b/google_symptoms/version.cfg @@ -1 +1 @@ -current_version = 0.3.55 +current_version = 0.3.56 diff --git a/hhs_hosp/version.cfg b/hhs_hosp/version.cfg index f5c28d2cd..ed4b085f1 100644 --- a/hhs_hosp/version.cfg +++ b/hhs_hosp/version.cfg @@ -1 +1 @@ -current_version = 0.3.55 +current_version = 0.3.56 diff --git a/jenkins/build-indicator.sh b/jenkins/build-indicator.sh index 3d2315741..88f79424a 100755 --- a/jenkins/build-indicator.sh +++ b/jenkins/build-indicator.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash # -# Jenkins build +# Jenkins build # set -eo pipefail @@ -17,4 +17,6 @@ source env/bin/activate pip install pip==23.0.1 --retries 10 --timeout 20 pip install numpy --retries 10 --timeout 20 pip install ../_delphi_utils_python/. --retries 10 --timeout 20 -[ ! -f setup.py ] || pip install . --retries 10 --timeout 20 +if [ -f setup.py ] || [ -f pyproject.toml ]; then + pip install . --retries 10 --timeout 20 +fi diff --git a/nchs_mortality/delphi_nchs_mortality/pull.py b/nchs_mortality/delphi_nchs_mortality/pull.py index 18bbfd59a..ad54e457a 100644 --- a/nchs_mortality/delphi_nchs_mortality/pull.py +++ b/nchs_mortality/delphi_nchs_mortality/pull.py @@ -1,15 +1,17 @@ # -*- coding: utf-8 -*- """Functions for pulling NCHS mortality data API.""" +import logging from typing import Optional import numpy as np import pandas as pd +from delphi_utils import create_backup_csv +from delphi_utils.geomap import GeoMapper from sodapy import Socrata -from delphi_utils.geomap import GeoMapper +from .constants import METRICS, NEWLINE, RENAME -from .constants import METRICS, RENAME, NEWLINE def standardize_columns(df): """Rename columns to comply with a standard set. @@ -22,7 +24,13 @@ def standardize_columns(df): return df.rename(columns=dict(rename_pairs)) -def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None): +def pull_nchs_mortality_data( + socrata_token: str, + backup_dir: str, + custom_run: bool, + logger: Optional[logging.Logger] = None, + test_file: Optional[str] = None, +): """Pull the latest NCHS Mortality data, and conforms it into a dataset. The output dataset has: @@ -40,6 +48,10 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None ---------- socrata_token: str My App Token for pulling the NCHS mortality data + backup_dir: str + Directory to which to save raw backup data + custom_run: bool + Flag indicating if the current run is a patch. If so, don't save any data to disk test_file: Optional[str] When not null, name of file from which to read test data @@ -60,6 +72,10 @@ def pull_nchs_mortality_data(socrata_token: str, test_file: Optional[str] = None client = Socrata("data.cdc.gov", socrata_token) results = client.get("r8kw-7aab", limit=10**10) df = pd.DataFrame.from_records(results) + + create_backup_csv(df, backup_dir, custom_run=custom_run, logger=logger) + + if not test_file: # drop "By Total" rows df = df[df["group"].transform(str.lower) == "by week"] diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index 50ce46cfb..4e88e9d61 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -59,6 +59,8 @@ def run_module(params: Dict[str, Any]): days=date.today().weekday() + 2) export_start_date = export_start_date.strftime('%Y-%m-%d') daily_export_dir = params["common"]["daily_export_dir"] + backup_dir = params["common"]["backup_dir"] + custom_run = params["common"].get("custom_run", False) socrata_token = params["indicator"]["socrata_token"] test_file = params["indicator"].get("test_file", None) @@ -70,7 +72,9 @@ def run_module(params: Dict[str, Any]): daily_arch_diff.update_cache() stats = [] - df_pull = pull_nchs_mortality_data(socrata_token, test_file) + df_pull = pull_nchs_mortality_data( + socrata_token, backup_dir, custom_run=custom_run, test_file=test_file, logger=logger + ) for metric in METRICS: for geo in ["state", "nation"]: if metric == 'percent_of_expected_deaths': diff --git a/nchs_mortality/params.json.template b/nchs_mortality/params.json.template index ed16c620c..2e829de24 100644 --- a/nchs_mortality/params.json.template +++ b/nchs_mortality/params.json.template @@ -2,7 +2,8 @@ "common": { "daily_export_dir": "./daily_receiving", "weekly_export_dir": "./receiving", - "log_filename": "/var/log/indicators/nchs_mortality.log", + "backup_dir": "./raw_data_backups", + "log_filename": "./nchs_mortality.log", "log_exceptions": false }, "indicator": { diff --git a/nchs_mortality/raw_data_backups/.gitignore b/nchs_mortality/raw_data_backups/.gitignore new file mode 100644 index 000000000..552154e09 --- /dev/null +++ b/nchs_mortality/raw_data_backups/.gitignore @@ -0,0 +1,120 @@ +# You should hard commit a prototype for this file, but we +# want to avoid accidental adding of API tokens and other +# private data parameters +params.json + +# Do not commit output files +receiving/*.csv + +# Remove macOS files +.DS_Store + +# virtual environment +dview/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +coverage.xml +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +.static_storage/ +.media/ +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ diff --git a/nchs_mortality/setup.py b/nchs_mortality/setup.py index 3fe354ba4..142db6f35 100644 --- a/nchs_mortality/setup.py +++ b/nchs_mortality/setup.py @@ -2,10 +2,12 @@ from setuptools import find_packages required = [ + "boto3", "darker[isort]~=2.1.1", "delphi-utils", "epiweeks", "freezegun", + "moto~=4.2.14", "numpy", "pandas", "pydocstyle", diff --git a/nchs_mortality/tests/conftest.py b/nchs_mortality/tests/conftest.py index 6ad0f9c59..383d1c782 100644 --- a/nchs_mortality/tests/conftest.py +++ b/nchs_mortality/tests/conftest.py @@ -14,8 +14,10 @@ PARAMS = { "common": { + "custom_run": True, "daily_export_dir": "./daily_receiving", - "weekly_export_dir": "./receiving" + "weekly_export_dir": "./receiving", + "backup_dir": "./raw_data_backups" }, "indicator": { "export_start_date": "2020-04-11", diff --git a/nchs_mortality/tests/raw_data_backups/.gitignore b/nchs_mortality/tests/raw_data_backups/.gitignore new file mode 100644 index 000000000..2b7efbb36 --- /dev/null +++ b/nchs_mortality/tests/raw_data_backups/.gitignore @@ -0,0 +1,2 @@ +*.csv +*.gz \ No newline at end of file diff --git a/nchs_mortality/tests/test_pull.py b/nchs_mortality/tests/test_pull.py index fa58b04a5..4f18210f6 100644 --- a/nchs_mortality/tests/test_pull.py +++ b/nchs_mortality/tests/test_pull.py @@ -1,3 +1,4 @@ +import os import pytest import pandas as pd @@ -34,7 +35,7 @@ def test_standardize_columns(self): pd.testing.assert_frame_equal(expected, df) def test_good_file(self): - df = pull_nchs_mortality_data(SOCRATA_TOKEN, "test_data.csv") + df = pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "test_data.csv") # Test columns assert ( @@ -90,9 +91,20 @@ def test_good_file(self): def test_bad_file_with_inconsistent_time_col(self): with pytest.raises(ValueError): pull_nchs_mortality_data( - SOCRATA_TOKEN, "bad_data_with_inconsistent_time_col.csv" + SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_inconsistent_time_col.csv" ) def test_bad_file_with_missing_cols(self): with pytest.raises(ValueError): - pull_nchs_mortality_data(SOCRATA_TOKEN, "bad_data_with_missing_cols.csv") + pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = "", custom_run = True, test_file = "bad_data_with_missing_cols.csv") + + def test_backup_today_data(self): + today = pd.Timestamp.today().strftime("%Y%m%d") + backup_dir = "./raw_data_backups" + pull_nchs_mortality_data(SOCRATA_TOKEN, backup_dir = backup_dir, custom_run = False, test_file = "test_data.csv") + backup_file = f"{backup_dir}/{today}.csv.gz" + backup_df = pd.read_csv(backup_file) + source_df = pd.read_csv("test_data/test_data.csv") + pd.testing.assert_frame_equal(source_df, backup_df) + if os.path.exists(backup_file): + os.remove(backup_file) diff --git a/nchs_mortality/version.cfg b/nchs_mortality/version.cfg index f5c28d2cd..ed4b085f1 100644 --- a/nchs_mortality/version.cfg +++ b/nchs_mortality/version.cfg @@ -1 +1 @@ -current_version = 0.3.55 +current_version = 0.3.56 diff --git a/nssp/delphi_nssp/constants.py b/nssp/delphi_nssp/constants.py index ddd2e74b8..9b98d2012 100644 --- a/nssp/delphi_nssp/constants.py +++ b/nssp/delphi_nssp/constants.py @@ -6,6 +6,7 @@ "nation", "state", "county", + "hhs", ] SIGNALS_MAP = { diff --git a/nssp/delphi_nssp/run.py b/nssp/delphi_nssp/run.py index 7c5a3ffac..b22d03c20 100644 --- a/nssp/delphi_nssp/run.py +++ b/nssp/delphi_nssp/run.py @@ -90,7 +90,7 @@ def run_module(params): for geo in GEOS: df = df_pull.copy() df["val"] = df[signal] - logger.info("Generating signal and exporting to CSV", metric=signal) + logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=signal) if geo == "nation": df = df[df["geography"] == "United States"] df["geo_id"] = "us" @@ -111,6 +111,14 @@ def run_module(params): df = geo_mapper.add_geocode(df, "fips", "msa", from_col="fips", new_col="geo_id") df = geo_mapper.aggregate_by_weighted_sum(df, "geo_id", "val", "timestamp", "population") df = df.rename(columns={"weighted_val": "val"}) + elif geo == "hhs": + df = df[(df["county"] == "All") & (df["geography"] != "United States")] + df = df[["geography", "val", "timestamp"]] + df = geo_mapper.add_population_column(df, geocode_type="state_name", geocode_col="geography") + df = geo_mapper.add_geocode(df, "state_name", "state_code", from_col="state_name") + df = geo_mapper.add_geocode(df, "state_code", "hhs", from_col="state_code", new_col="geo_id") + df = geo_mapper.aggregate_by_weighted_sum(df, "geo_id", "val", "timestamp", "population") + df = df.rename(columns={"weighted_val": "val"}) else: df = df[df["county"] != "All"] df["geo_id"] = df["fips"] diff --git a/nwss_wastewater/delphi_nwss/run.py b/nwss_wastewater/delphi_nwss/run.py index 378849ba5..60bfc84c7 100644 --- a/nwss_wastewater/delphi_nwss/run.py +++ b/nwss_wastewater/delphi_nwss/run.py @@ -146,7 +146,7 @@ def run_module(params): df = generate_weights(df, sensor) for geo in GEOS: - logger.info("Generating signal and exporting to CSV", metric=sensor) + logger.info("Generating signal and exporting to CSV", geo_type=geo, signal=sensor) if geo == "nation": agg_df = weighted_nation_sum(df, sensor) else: diff --git a/quidel_covidtest/delphi_quidel_covidtest/pull.py b/quidel_covidtest/delphi_quidel_covidtest/pull.py index d9f23f2ec..560f89456 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/pull.py +++ b/quidel_covidtest/delphi_quidel_covidtest/pull.py @@ -56,7 +56,7 @@ def get_from_s3(start_date, end_date, bucket, logger): seen_files = set() for search_date in [start_date + timedelta(days=x) for x in range(n_days)]: if search_date in s3_files.keys(): - logger.info(f"Pulling data received on {search_date.date()}") + logger.info("Pulling data received on date", search_date=search_date.date()) # Fetch data received on the same day for fn in s3_files[search_date]: @@ -110,11 +110,11 @@ def fix_date(df, logger): df.insert(2, "timestamp", df["TestDate"]) mask = df["TestDate"] <= df["StorageDate"] - logger.info(f"Removing {((len(df) - np.sum(mask)) * 100 / len(df)):.2f}% of unusual data") + logger.info("Removing unusual data", percent=round((len(df) - np.sum(mask)) * 100 / len(df), 2)) df = df[mask] mask = df["StorageDate"] - df["TestDate"] > pd.Timedelta(days=90) - logger.info(f"Fixing {(np.sum(mask) * 100 / len(df)):.2f}% of outdated data") + logger.info("Fixing outdated data", percent=round((np.sum(mask) * 100 / len(df)), 2)) df["timestamp"].values[mask] = df["StorageDate"].values[mask] return df diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index a59e0c101..e6974b6aa 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -164,7 +164,7 @@ def run_module(params: Dict[str, Any]): n_cpu = min(8, cpu_count()) # for parallelization with pool_and_threadedlogger(logger, n_cpu) as (pool, threaded_logger): # for using loggers in multiple threads - logger.info("Parallelizing sensor generation", n_cpu=n_cpu) + logger.info("Parallelizing sensor generation", n_workers=n_cpu) pool_results = [] for geo_res in NONPARENT_GEO_RESOLUTIONS: geo_data, res_key = geo_map(geo_res, data) diff --git a/quidel_covidtest/setup.py b/quidel_covidtest/setup.py index c2791930f..82c80832a 100644 --- a/quidel_covidtest/setup.py +++ b/quidel_covidtest/setup.py @@ -2,6 +2,7 @@ from setuptools import find_packages required = [ + "boto3", "covidcast", "darker[isort]~=2.1.1", "delphi-utils", @@ -14,7 +15,7 @@ "pylint==2.8.3", "pytest-cov", "pytest", - "xlrd==1.2.0", + "xlrd==1.2.0", # needed by Pandas to read Excel files ] setup( diff --git a/quidel_covidtest/version.cfg b/quidel_covidtest/version.cfg index f5c28d2cd..ed4b085f1 100644 --- a/quidel_covidtest/version.cfg +++ b/quidel_covidtest/version.cfg @@ -1 +1 @@ -current_version = 0.3.55 +current_version = 0.3.56 diff --git a/sir_complainsalot/version.cfg b/sir_complainsalot/version.cfg index f5c28d2cd..ed4b085f1 100644 --- a/sir_complainsalot/version.cfg +++ b/sir_complainsalot/version.cfg @@ -1 +1 @@ -current_version = 0.3.55 +current_version = 0.3.56