Skip to content

Commit

Permalink
Merge branch 'main' into automate-hospital-admission-patch
Browse files Browse the repository at this point in the history
  • Loading branch information
aysim319 committed Oct 30, 2024
2 parents 1666e0c + 3450dfc commit 73a9063
Show file tree
Hide file tree
Showing 66 changed files with 905 additions and 546 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.3.55
current_version = 0.3.56
commit = True
message = chore: bump covidcast-indicators to {new_version}
tag = False
13 changes: 8 additions & 5 deletions .github/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ The production branch is configured to automatically deploy to our production en

* everything else

All other branches are development branches. We don't enforce a naming policy.
All other branches are development branches. We don't enforce a naming policy, but it is recommended to prefix all branches you create with your name, username, or initials (e.g. `username/branch-name`).

We don't forbid force-pushing, but please keep to a minimum and be careful of using when modifying a branch at the same time as others.

## Issues

Expand All @@ -29,7 +31,7 @@ So, how does one go about developing a pipeline for a new data source?
### tl;dr

1. Create your new indicator branch from `main`.
2. Build it using the appropriate template, following the guidelines in the included README.md and REVIEW.md files.
2. Build it using the [indicator template](https://github.com/cmu-delphi/covidcast-indicators/tree/main/_template_python), following the guidelines in the included README.md, REVIEW.md, and INDICATOR_DEV_GUIDE.md files.
3. Make some stuff!
4. When your stuff works, push your development branch to remote, and open a PR against `main` for review.
5. Once your PR has been merged, consult with a platform engineer for the remaining production setup needs. They will create a deployment workflow for your indicator including any necessary production parameters. Production secrets are encrypted in the Ansible vault. This workflow will be tested in staging by admins, who will consult you about any problems they encounter.
Expand All @@ -50,7 +52,7 @@ git checkout -b dev-my-feature-branch

### Creating your indicator

Create a directory for your new indicator by making a copy of `_template_r` or `_template_python` depending on the programming language you intend to use. If using Python, add the name of the directory to the list found in `jobs > build > strategy > matrix > packages` in `.github/workflows/python-ci.yml`, which will enable automated checks for your indicator when you make PRs. The template copies of `README.md` and `REVIEW.md` include the minimum requirements for code structure, documentation, linting, testing, and method of configuration. Beyond that, we don't have any established restrictions on implementation; you can look at other existing indicators see some examples of code layout, organization, and general approach.
Create a directory for your new indicator by making a copy of `_template_python`. (We also make a `_template_r` available, but R should be only used as a last resort, due to complications using it in production.) Add the name of the directory to the list found in `jobs > build > strategy > matrix > packages` in `.github/workflows/python-ci.yml`, which will enable automated checks for your indicator when you make PRs. The template copies of `README.md` and `REVIEW.md` include the minimum requirements for code structure, documentation, linting, testing, and method of configuration. Beyond that, we don't have any established restrictions on implementation; you can look at other existing indicators see some examples of code layout, organization, and general approach.

* Consult your peers with questions! :handshake:

Expand All @@ -62,7 +64,7 @@ Once you have something that runs locally and passes tests you set up your remot
git push -u origin dev-my-feature-branch
```

You can then draft public API documentation for people who would fetch this
You can then draft [public API documentation](https://cmu-delphi.github.io/delphi-epidata/) for people who would fetch this
data from the API. Public API documentation is kept in the delphi-epidata
repository, and there is a [template Markdown
file](https://github.com/cmu-delphi/delphi-epidata/blob/main/docs/api/covidcast-signals/_source-template.md)
Expand Down Expand Up @@ -104,7 +106,8 @@ We use a branch-based git workflow coupled with [Jenkins](https://www.jenkins.io
* Package - Tar and gzip the built environment.
* Deploy - Trigger an Ansible playbook to place the built package onto the runtime host, place any necessary production configuration, and adjust the runtime envirnemnt (if necessary).

There are several additional Jenkins-specific files that will need to be created for each indicator, as well as some configuration additions to the runtime host. It will be important to pair with a platform engineer to prepare the necessary production environment needs, test the workflow, validate on production, and ultimately sign off on a production release.
There are several additional Jenkins-specific files that will need to be created for each indicator, as well as some configuration additions to the runtime host.
It will be important to pair with a platform engineer to prepare the necessary production environment needs, test the workflow, validate on production, and ultimately sign off on a production release.

### Preparing container images of indicators

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ jobs:
- name: Release
run: |
make release
- uses: actions/upload-artifact@v2
- uses: actions/upload-artifact@v4
with:
name: delphi_utils
path: _delphi_utils_python/dist/*.tar.gz
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
with:
python-version: 3.8
cache: "pip"
cache-dependency-path: "setup.py"
cache-dependency-path: "pyproject.toml"
- name: Install testing dependencies
run: |
python -m pip install --upgrade pip
Expand Down
4 changes: 4 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ def deploy_production = [:]

pipeline {
agent any
environment {
// Set the PATH variable to include the pyenv shims directory.
PATH = "/var/lib/jenkins/.pyenv/shims:${env.PATH}"
}
stages {
stage('Build dev/feature branch') {
when {
Expand Down
4 changes: 2 additions & 2 deletions _delphi_utils_python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
[bumpversion]
current_version = 0.3.24
current_version = 0.3.25
commit = True
message = chore: bump delphi_utils to {new_version}
tag = False

[bumpversion:file:setup.py]
[bumpversion:file:pyproject.toml]

[bumpversion:file:delphi_utils/__init__.py]
2 changes: 1 addition & 1 deletion _delphi_utils_python/DEVELOP.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ To install the module in your default version of Python, run the
following from this directory:

```
pip install .
pip install -e '.[dev]'
```

As described in each of the indicator code directories, you will want to install
Expand Down
9 changes: 5 additions & 4 deletions _delphi_utils_python/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ venv:
install: venv
. env/bin/activate; \
pip install wheel ; \
pip install -e .
pip install -e '.[dev]'

install-ci: venv
. env/bin/activate; \
pip install wheel ; \
pip install .
pip install 'build[virtualenv]' pylint pytest pydocstyle wheel twine ; \
pip install '.[dev]'

lint:
. env/bin/activate; pylint delphi_utils --rcfile=../pyproject.toml
Expand All @@ -30,4 +30,5 @@ clean:

release: lint test
. env/bin/activate ; \
python setup.py sdist bdist_wheel
pip install 'build[virtualenv]' ; \
python -m build --sdist --wheel
11 changes: 10 additions & 1 deletion _delphi_utils_python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,22 @@ Source code can be found here:

## Logger Usage

To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic, the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier), and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument to the logger call (for use in filtering, thresholding, grouping, visualization, etc).

### Commonly used argument names:
- data_source
- geo_type
- signal
- issue_date
- filename

Single-thread usage.

```py
from delphi_utils.logger import get_structured_logger

logger = get_structured_logger('my_logger')
logger.info('Hello, world!')
logger.info('Hello', name='World')
```

Multi-thread usage.
Expand Down
15 changes: 7 additions & 8 deletions _delphi_utils_python/delphi_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
from __future__ import absolute_import

from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer
from .export import create_export_csv
from .utils import read_params

from .slack_notifier import SlackNotifier
from .logger import get_structured_logger
from .export import create_backup_csv, create_export_csv
from .geomap import GeoMapper
from .smooth import Smoother
from .signal import add_prefix
from .logger import get_structured_logger
from .nancodes import Nans
from .signal import add_prefix
from .slack_notifier import SlackNotifier
from .smooth import Smoother
from .utils import read_params
from .weekday import Weekday

__version__ = "0.3.24"
__version__ = "0.3.25"
81 changes: 76 additions & 5 deletions _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
"""Export data in the format expected by the Delphi API."""
# -*- coding: utf-8 -*-
import gzip
import logging
from datetime import datetime
from os.path import join
from os.path import getsize, join
from typing import Optional
import logging

from epiweeks import Week
import numpy as np
import pandas as pd
from epiweeks import Week

from .nancodes import Nans


def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
"""Find values with contradictory missingness codes, filter them, and log."""
columns = ["val", "se", "sample_size"]
Expand All @@ -22,8 +24,10 @@ def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
for mask in masks:
if not logger is None and df.loc[mask].size > 0:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
"Filtering contradictory missing code",
sensor=sensor,
metric=metric,
date=date.strftime(format="%Y-%m-%d"),
)
df = df.loc[~mask]
elif logger is None and df.loc[mask].size > 0:
Expand Down Expand Up @@ -130,3 +134,70 @@ def create_export_csv(
export_df = export_df.sort_values(by="geo_id")
export_df.to_csv(export_file, index=False, na_rep="NA")
return dates


def create_backup_csv(
df: pd.DataFrame,
backup_dir: str,
custom_run: bool,
issue: Optional[str] = None,
geo_res: Optional[str] = None,
sensor: Optional[str] = None,
metric: Optional[str] = None,
logger: Optional[logging.Logger] = None,
):
"""Save data for use as a backup.
This function is meant to save raw data fetched from data sources.
Therefore, it avoids manipulating the data as much as possible to
preserve the input.
When only required arguments are passed, data will be saved to a file of
the format `<export_dir>/<today's date as YYYYMMDD>.csv`. Optional arguments
should be passed if the source data is fetched from different tables or
in batches by signal, geo, etc.
Parameters
----------
df: pd.DataFrame
Columns: geo_id, timestamp, val, se, sample_size
backup_dir: str
Backup directory
custom_run: bool
Flag indicating if the current run is a patch, or other run where
backups aren't needed. If so, don't save any data to disk
issue: Optional[str]
The date the data was fetched, in YYYYMMDD format. Defaults to "today"
if not provided
geo_res: Optional[str]
Geographic resolution of the data
sensor: Optional[str]
Sensor that has been calculated (cumulative_counts vs new_counts)
metric: Optional[str]
Metric we are considering, if any.
logger: Optional[logging.Logger]
Pass a logger object here to log information about name and size of the backup file.
Returns
---------
dates: pd.Series[datetime]
Series of dates for which CSV files were exported.
"""
if not custom_run:
# Label the file with today's date (the date the data was fetched).
if not issue:
issue = datetime.today().strftime("%Y%m%d")

backup_filename = [issue, geo_res, metric, sensor]
backup_filename = "_".join(filter(None, backup_filename)) + ".csv.gz"
backup_file = join(backup_dir, backup_filename)

with gzip.open(backup_file, "wt", newline="") as f:
df.to_csv(f, index=False, na_rep="NA")

if logger:
logger.info(
"Backup file created",
backup_file=backup_file,
backup_size=getsize(backup_file),
)
3 changes: 1 addition & 2 deletions _delphi_utils_python/delphi_utils/flash_eval/eval_day.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ def output(evd_ranking, day, lag, signal, logger):
p_text += f"\t{start_link}|*{index}*, {'{:.2f}'.format(value)}>\n"
else:
break
name = f"Signal: {signal} Lag: {lag}"
logger.info(name, payload=p_text)
logger.info("FLaSH: worth inspecting", signal=signal, lag=lag, payload=p_text)


def evd_ranking_fn(ts_streams, EVD_max, EVD_min):
Expand Down
6 changes: 4 additions & 2 deletions _delphi_utils_python/delphi_utils/geomap.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ def add_population_column(
---------
data: pd.DataFrame
The dataframe with a FIPS code column.
geocode_type: {"fips", "zip"}
geocode_type:
The type of the geocode contained in geocode_col.
geocode_col: str, default None
The name of the column containing the geocodes. If None, uses the geocode_type
Expand Down Expand Up @@ -671,8 +671,10 @@ def aggregate_by_weighted_sum(
to a from_geo, e.g. "wastewater collection site").
to_geo: str
The column name of the geocode to aggregate to.
sensor: str
sensor_col: str
The column name of the sensor to aggregate.
time_col: str
The column name of the timestamp to aggregate over.
population_column: str
The column name of the population to weight the sensor by.
Expand Down
10 changes: 5 additions & 5 deletions _delphi_utils_python/delphi_utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Structured logger utility for creating JSON logs.
See the delphi_utils README.md for usage examples.
To make our structured logging as useful as it can be, particularly within the context of how we use logs in Elastic,
the `event` argument (typically the first unnamed arg) should be a static string (to make filtering easier),
and each dynamic/varying value should be specified in an individual meaningfully- and consistently-named argument
to the logger call (for use in filtering, thresholding, grouping, visualization, etc)
The Delphi group uses two ~identical versions of this file.
Try to keep them in sync with edits, for sanity.
https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py
https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py
See the delphi_utils README.md for usage examples.
"""

import contextlib
Expand Down
19 changes: 14 additions & 5 deletions _delphi_utils_python/delphi_utils/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],

#Get version and indicator name for startup
ind_name = indicator_fn.__module__.replace(".run", "")

#Check for version.cfg in indicator directory
if os.path.exists("version.cfg"):
with open("version.cfg") as ver_file:
Expand All @@ -59,9 +60,15 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
if "current_version" in line:
current_version = str.strip(line)
current_version = current_version.replace("current_version = ", "")
#Logging - Starting Indicator
logger.info(f"Started {ind_name} with covidcast-indicators version {current_version}")
else: logger.info(f"Started {ind_name} without version.cfg")
logger.info(
"Started a covidcast-indicator",
indicator_name=ind_name,
current_version=current_version,
)
else:
logger.info(
"Started a covidcast-indicator without version.cfg", indicator_name=ind_name
)

indicator_fn(params)
validator = validator_fn(params)
Expand All @@ -77,8 +84,10 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
break
time.sleep(1)
else:
logger.error(f"Flash step timed out ({timer} s), terminating",
elapsed_time_in_seconds = round(time.time() - start, 2))
logger.error(
"Flash step timed out, terminating",
elapsed_time_in_seconds=round(time.time() - start, 2),
)
t1.terminate()
t1.join()
if validator:
Expand Down
Loading

0 comments on commit 73a9063

Please sign in to comment.