Skip to content

Commit

Permalink
Merge pull request #1897 from cmu-delphi/release/indicators_v0.3.47_u…
Browse files Browse the repository at this point in the history
…tils_v0.3.21

Release covidcast-indicators 0.3.47
  • Loading branch information
melange396 authored Sep 7, 2023
2 parents 33793e1 + d357ae3 commit da6edd5
Show file tree
Hide file tree
Showing 22 changed files with 85 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.3.46
current_version = 0.3.47
commit = True
message = chore: bump covidcast-indicators to {new_version}
tag = False
2 changes: 1 addition & 1 deletion _delphi_utils_python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.3.20
current_version = 0.3.21
commit = True
message = chore: bump delphi_utils to {new_version}
tag = False
Expand Down
2 changes: 1 addition & 1 deletion _delphi_utils_python/delphi_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
from .nancodes import Nans
from .weekday import Weekday

__version__ = "0.3.20"
__version__ = "0.3.21"
6 changes: 5 additions & 1 deletion _delphi_utils_python/delphi_utils/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,13 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None],
start = time.time()
while time.time()-start < timer:
if not t1.is_alive():
logger.info("Completed flash step",
elapsed_time_in_seconds = round(time.time() - start, 2))
break
time.sleep(10)
time.sleep(1)
else:
logger.error(f"Flash step timed out ({timer} s), terminating",
elapsed_time_in_seconds = round(time.time() - start, 2))
t1.terminate()
t1.join()
if validator:
Expand Down
19 changes: 14 additions & 5 deletions _delphi_utils_python/delphi_utils/validator/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ def check_min_allowed_max_date(self, max_date, geo_type, signal_type, report):
ValidationFailure("check_min_max_date",
geo_type=geo_type,
signal=signal_type,
message="date of most recent generated file seems too long ago"))
date=max_date,
message="date of most recent generated file seems too long ago "
f"({max_date} < {self.params.generation_date} - {min_thres})"))

report.increment_total_checks()

Expand All @@ -263,7 +265,9 @@ def check_max_allowed_max_date(self, max_date, geo_type, signal_type, report):
ValidationFailure("check_max_max_date",
geo_type=geo_type,
signal=signal_type,
message="date of most recent generated file seems too recent"))
date=max_date,
message="date of most recent generated file seems too recent "
f"({max_date} > {self.params.generation_date} - {max_thres})"))

report.increment_total_checks()

Expand Down Expand Up @@ -307,7 +311,9 @@ def create_dfs(self, geo_sig_df, api_df_or_error, checking_date, geo_type, signa
signal_type,
"test data for a given checking date-geo type-signal type"
" combination is missing. Source data may be missing"
" for one or more dates"))
" for one or more dates "
f"({checking_date} < {self.params.generation_date} "
f"- {min_thres})"))
return False

# Reference dataframe runs backwards from the recent_cutoff_date
Expand Down Expand Up @@ -418,7 +424,9 @@ def check_max_date_vs_reference(self, df_to_test, df_to_reference, checking_date
checking_date,
geo_type,
signal_type,
"reference df has days beyond the max date in the =df_to_test="))
"reference df has days beyond the max date in the =df_to_test= "
f"{df_to_test['time_value'].max()} < "
f"{df_to_reference['time_value'].max().date()}"))

report.increment_total_checks()

Expand Down Expand Up @@ -459,7 +467,8 @@ def check_rapid_change_num_rows(self, df_to_test, df_to_reference, checking_date
geo_type,
signal_type,
"Number of rows per day seems to have changed rapidly (reference "
"vs test data)"))
"vs test data); "
f"relative difference: {abs(compare_rows)} > 0.35"))
report.increment_total_checks()

def check_positive_negative_spikes(self, source_df, api_frames, geo, sig, report):
Expand Down
13 changes: 11 additions & 2 deletions _delphi_utils_python/delphi_utils/validator/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ def __init__(self, errors_to_suppress: List[ValidationFailure],
Warnings raised from validation execution
unsuppressed_errors: List[Exception]
Errors raised from validation failures not found in `self.errors_to_suppress`
elapsed_time_in_seconds: float
Elapsed time of validation run, rounded down
"""
self.errors_to_suppress = errors_to_suppress
self.data_source = data_source
Expand All @@ -44,6 +46,7 @@ def __init__(self, errors_to_suppress: List[ValidationFailure],
self.raised_warnings = []
self.unsuppressed_errors = []
self.dry_run = dry_run
self.elapsed_time_in_seconds = -1
# pylint: enable=R0902

def add_raised_error(self, error):
Expand All @@ -68,6 +71,10 @@ def increment_total_checks(self):
"""Record a check."""
self.total_checks += 1

def set_elapsed_time_in_seconds(self, time):
"""Set elapsed runtime in seconds for later logging."""
self.elapsed_time_in_seconds = time

def add_raised_warning(self, warning):
"""Add a warning to the report.
Expand All @@ -94,15 +101,17 @@ def log(self, logger=None):
checks_failed = len(self.unsuppressed_errors),
checks_suppressed = self.num_suppressed,
warnings = len(self.raised_warnings),
phase = "validation")
phase = "validation",
elapsed_time_in_seconds=self.elapsed_time_in_seconds)
else:
logger.info("Validation run unsuccessful",
data_source = self.data_source,
checks_run = self.total_checks,
checks_failed = len(self.unsuppressed_errors),
checks_suppressed = self.num_suppressed,
warnings = len(self.raised_warnings),
phase="validation")
phase="validation",
elapsed_time_in_seconds=self.elapsed_time_in_seconds)
# Threshold for slack alerts if warnings are excessive,
# Currently extremely strict, set by observation of 1 month's logs
excessive_warnings = self.total_checks > 0 and \
Expand Down
30 changes: 19 additions & 11 deletions _delphi_utils_python/delphi_utils/validator/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,17 +178,17 @@ def check_bad_geo_id_value(self, df_to_test, filename, geo_type, report):
- report: ValidationReport; report where results are added
"""
valid_geos = self._get_valid_geo_values(geo_type)
unexpected_geos = [geo for geo in df_to_test['geo_id']
if geo.lower() not in valid_geos]
unexpected_geos = {geo for geo in df_to_test['geo_id']
if geo.lower() not in valid_geos}
if len(unexpected_geos) > 0:
report.add_raised_error(
ValidationFailure(
"check_bad_geo_id_value",
filename=filename,
message=f"Unrecognized geo_ids (not in historical data) {unexpected_geos}"))
report.increment_total_checks()
upper_case_geos = [
geo for geo in df_to_test['geo_id'] if geo.lower() != geo]
upper_case_geos = {
geo for geo in df_to_test['geo_id'] if geo.lower() != geo}
if len(upper_case_geos) > 0:
report.add_raised_warning(
ValidationFailure(
Expand Down Expand Up @@ -218,8 +218,8 @@ def find_all_unexpected_geo_ids(df_to_test, geo_regex, geo_type):
if geo_type in numeric_geo_types:
# Check if geo_ids were stored as floats (contain decimal point) and
# contents before decimal match the specified regex pattern.
leftover = [geo[1] for geo in df_to_test["geo_id"].str.split(
".") if len(geo) > 1 and re.match(geo_regex, geo[0])]
leftover = {geo[1] for geo in df_to_test["geo_id"].str.split(
".") if len(geo) > 1 and re.match(geo_regex, geo[0])}

# If any floats found, remove decimal and anything after.
if len(leftover) > 0:
Expand All @@ -230,7 +230,7 @@ def find_all_unexpected_geo_ids(df_to_test, geo_regex, geo_type):
ValidationFailure(
"check_geo_id_type",
filename=nameformat,
message="geo_ids saved as floats; strings preferred"))
message=f"geo_ids saved as floats; strings preferred: {leftover}"))

if geo_type in fill_len.keys():
# Left-pad with zeroes up to expected length. Fixes missing leading zeroes
Expand Down Expand Up @@ -281,29 +281,35 @@ def check_bad_val(self, df_to_test, nameformat, signal_type, report):

if percent_option:
if not df_to_test[(df_to_test['val'] > 100)].empty:
bad_values = df_to_test[(df_to_test['val'] > 100)]['val'].unique()
report.add_raised_error(
ValidationFailure(
"check_val_pct_gt_100",
filename=nameformat,
message="val column can't have any cell greater than 100 for percents"))
message="val column can't have any cell greater than 100 for percents; "
f"invalid values: {bad_values}"))

report.increment_total_checks()

if proportion_option:
if not df_to_test[(df_to_test['val'] > 100000)].empty:
bad_values = df_to_test[(df_to_test['val'] > 100000)]['val'].unique()
report.add_raised_error(
ValidationFailure("check_val_prop_gt_100k",
filename=nameformat,
message="val column can't have any cell greater than 100000 "
"for proportions"))
"for proportions; "
f"invalid values: {bad_values}"))

report.increment_total_checks()

if not df_to_test[(df_to_test['val'] < 0)].empty:
bad_values = df_to_test[(df_to_test['val'] < 0)]['val'].unique()
report.add_raised_error(
ValidationFailure("check_val_lt_0",
filename=nameformat,
message="val column can't have any cell smaller than 0"))
message="val column can't have any cell smaller than 0; "
f"invalid values: {bad_values}"))

report.increment_total_checks()

Expand Down Expand Up @@ -346,10 +352,12 @@ def check_bad_se(self, df_to_test, nameformat, report):
report.increment_total_checks()

if df_to_test["se"].isnull().mean() > 0.5:
bad_mean = round(df_to_test["se"].isnull().mean() * 100, 2)
report.add_raised_error(
ValidationFailure("check_se_many_missing",
filename=nameformat,
message='Recent se values are >50% NA'))
message='Recent se values are >50% NA: '
f'{bad_mean}%'))

report.increment_total_checks()

Expand Down
3 changes: 3 additions & 0 deletions _delphi_utils_python/delphi_utils/validator/validate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
"""Tools to validate CSV source data, including various check methods."""
import time
from .datafetcher import load_all_files
from .dynamic import DynamicValidator
from .errors import ValidationFailure
Expand Down Expand Up @@ -54,11 +55,13 @@ def validate(self):
Returns:
- ValidationReport collating the validation outcomes
"""
start_time = time.time()
report = ValidationReport(self.suppressed_errors, self.data_source, self.dry_run)
frames_list = load_all_files(self.export_dir, self.time_window.start_date,
self.time_window.end_date)
self.static_validation.validate(frames_list, report)
# Dynamic Validation only performed when frames_list is populated
if len(frames_list) > 0:
self.dynamic_validation.validate(aggregate_frames(frames_list), report)
report.set_elapsed_time_in_seconds(round(time.time() - start_time, 2))
return report
2 changes: 1 addition & 1 deletion _delphi_utils_python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

setup(
name="delphi_utils",
version="0.3.20",
version="0.3.21",
description="Shared Utility Functions for Indicators",
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
2 changes: 2 additions & 0 deletions changehc/delphi_changehc/update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ def geo_reindex(self, data):
date_col=Config.DATE_COL)
# this line should be removed once the fix is implemented for megacounties
data_frame = data_frame[~((data_frame['county'].str.len() > 5) | (data_frame['county'].str.contains('_')))]
# handle rogue \N:
data_frame = data_frame[data_frame['county'] != r'\N']
elif geo == "state":
data_frame = gmpr.replace_geocode(data, "fips", "state_id", new_col="state",
date_col=Config.DATE_COL)
Expand Down
5 changes: 5 additions & 0 deletions changehc/tests/test_update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ def test_geo_reindex(self):
"fips": ['01001'] * 7 + ['04007'] * 6,
"den": [1000] * 7 + [2000] * 6,
"timestamp": [pd.Timestamp(f'03-{i}-2020') for i in range(1, 14)]})
if geo == "county": # test for rogue \N
row_contain_N = {"num": 700, "fips": r"\N", "den": 2000, "timestamp": pd.Timestamp("03-15-2020")}
test_data = test_data.append(row_contain_N, ignore_index=True)
data_frame = su_inst.geo_reindex(test_data)
assert data_frame.shape[0] == multiple*len(su_inst.fit_dates)
assert (data_frame.sum(numeric_only=True) == (4200,19000)).all()
if geo == "county":
assert r'\N' not in data_frame.index.get_level_values('county')

def test_update_sensor(self):
"""Tests that the sensors are properly updated."""
Expand Down
2 changes: 1 addition & 1 deletion changehc/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.46
current_version = 0.3.47
2 changes: 1 addition & 1 deletion claims_hosp/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.46
current_version = 0.3.47
2 changes: 1 addition & 1 deletion doctor_visits/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.46
current_version = 0.3.47
2 changes: 1 addition & 1 deletion google_symptoms/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.46
current_version = 0.3.47
2 changes: 1 addition & 1 deletion hhs_hosp/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.46
current_version = 0.3.47
2 changes: 1 addition & 1 deletion nchs_mortality/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.46
current_version = 0.3.47
2 changes: 1 addition & 1 deletion nowcast/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.46
current_version = 0.3.47
9 changes: 8 additions & 1 deletion quidel_covidtest/delphi_quidel_covidtest/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""Collect and process Quidel export files."""
from os.path import join
import os
import time
from datetime import datetime, timedelta
import boto3

Expand Down Expand Up @@ -369,7 +370,7 @@ def check_export_start_date(export_start_date, export_end_date,
return datetime(2020, 5, 26)
return export_start_date

def update_cache_file(df, _end_date, cache_dir):
def update_cache_file(df, _end_date, cache_dir, logger):
"""
Update cache file. Remove the old one, export the new one.
Expand All @@ -380,8 +381,14 @@ def update_cache_file(df, _end_date, cache_dir):
The most recent date when the raw data is received
cache_dir:
./cache where the cache file is stored
logger: logging.Logger
Structured logger.
"""
start_time = time.time()
for fn in os.listdir(cache_dir):
if ".csv" in fn:
os.remove(join(cache_dir, fn))
df.to_csv(join(cache_dir, "pulled_until_%s.csv") % _end_date.strftime("%Y%m%d"), index=False)
logger.info("Completed cache file update",
end_date = _end_date.strftime('%Y-%m-%d'),
elapsed_time_in_seconds = round(time.time() - start_time, 2))
6 changes: 5 additions & 1 deletion quidel_covidtest/delphi_quidel_covidtest/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def run_module(params: Dict[str, Any]):
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
stats = []
# Log at program exit in case of an exception, otherwise after successful completion
atexit.register(log_exit, start_time, stats, logger)
cache_dir = params["indicator"]["input_cache_dir"]
export_dir = params["common"]["export_dir"]
Expand Down Expand Up @@ -223,4 +224,7 @@ def run_module(params: Dict[str, Any]):

# Export the cache file if the pipeline runs successfully.
# Otherwise, don't update the cache file
update_cache_file(df, _end_date, cache_dir)
update_cache_file(df, _end_date, cache_dir, logger)
# Log stats now instead of at program exit
atexit.unregister(log_exit)
log_exit(start_time, stats, logger)
2 changes: 1 addition & 1 deletion quidel_covidtest/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.46
current_version = 0.3.47
2 changes: 1 addition & 1 deletion sir_complainsalot/version.cfg
Original file line number Diff line number Diff line change
@@ -1 +1 @@
current_version = 0.3.46
current_version = 0.3.47

0 comments on commit da6edd5

Please sign in to comment.