diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 6a83715f3..ecb9f760a 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.3.46 +current_version = 0.3.47 commit = True message = chore: bump covidcast-indicators to {new_version} tag = False diff --git a/_delphi_utils_python/.bumpversion.cfg b/_delphi_utils_python/.bumpversion.cfg index 630a769dd..72ed335ba 100644 --- a/_delphi_utils_python/.bumpversion.cfg +++ b/_delphi_utils_python/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.3.20 +current_version = 0.3.21 commit = True message = chore: bump delphi_utils to {new_version} tag = False diff --git a/_delphi_utils_python/delphi_utils/__init__.py b/_delphi_utils_python/delphi_utils/__init__.py index 3d713f571..38c74567c 100644 --- a/_delphi_utils_python/delphi_utils/__init__.py +++ b/_delphi_utils_python/delphi_utils/__init__.py @@ -15,4 +15,4 @@ from .nancodes import Nans from .weekday import Weekday -__version__ = "0.3.20" +__version__ = "0.3.21" diff --git a/_delphi_utils_python/delphi_utils/runner.py b/_delphi_utils_python/delphi_utils/runner.py index 13fa1606d..abc28ba19 100644 --- a/_delphi_utils_python/delphi_utils/runner.py +++ b/_delphi_utils_python/delphi_utils/runner.py @@ -72,9 +72,13 @@ def run_indicator_pipeline(indicator_fn: Callable[[Params], None], start = time.time() while time.time()-start < timer: if not t1.is_alive(): + logger.info("Completed flash step", + elapsed_time_in_seconds = round(time.time() - start, 2)) break - time.sleep(10) + time.sleep(1) else: + logger.error(f"Flash step timed out ({timer} s), terminating", + elapsed_time_in_seconds = round(time.time() - start, 2)) t1.terminate() t1.join() if validator: diff --git a/_delphi_utils_python/delphi_utils/validator/dynamic.py b/_delphi_utils_python/delphi_utils/validator/dynamic.py index 4911628ee..6758086ab 100644 --- a/_delphi_utils_python/delphi_utils/validator/dynamic.py +++ b/_delphi_utils_python/delphi_utils/validator/dynamic.py @@ -237,7 +237,9 @@ def check_min_allowed_max_date(self, max_date, geo_type, signal_type, report): ValidationFailure("check_min_max_date", geo_type=geo_type, signal=signal_type, - message="date of most recent generated file seems too long ago")) + date=max_date, + message="date of most recent generated file seems too long ago " + f"({max_date} < {self.params.generation_date} - {min_thres})")) report.increment_total_checks() @@ -263,7 +265,9 @@ def check_max_allowed_max_date(self, max_date, geo_type, signal_type, report): ValidationFailure("check_max_max_date", geo_type=geo_type, signal=signal_type, - message="date of most recent generated file seems too recent")) + date=max_date, + message="date of most recent generated file seems too recent " + f"({max_date} > {self.params.generation_date} - {max_thres})")) report.increment_total_checks() @@ -307,7 +311,9 @@ def create_dfs(self, geo_sig_df, api_df_or_error, checking_date, geo_type, signa signal_type, "test data for a given checking date-geo type-signal type" " combination is missing. Source data may be missing" - " for one or more dates")) + " for one or more dates " + f"({checking_date} < {self.params.generation_date} " + f"- {min_thres})")) return False # Reference dataframe runs backwards from the recent_cutoff_date @@ -418,7 +424,9 @@ def check_max_date_vs_reference(self, df_to_test, df_to_reference, checking_date checking_date, geo_type, signal_type, - "reference df has days beyond the max date in the =df_to_test=")) + "reference df has days beyond the max date in the =df_to_test= " + f"{df_to_test['time_value'].max()} < " + f"{df_to_reference['time_value'].max().date()}")) report.increment_total_checks() @@ -459,7 +467,8 @@ def check_rapid_change_num_rows(self, df_to_test, df_to_reference, checking_date geo_type, signal_type, "Number of rows per day seems to have changed rapidly (reference " - "vs test data)")) + "vs test data); " + f"relative difference: {abs(compare_rows)} > 0.35")) report.increment_total_checks() def check_positive_negative_spikes(self, source_df, api_frames, geo, sig, report): diff --git a/_delphi_utils_python/delphi_utils/validator/report.py b/_delphi_utils_python/delphi_utils/validator/report.py index f0840de95..f7be58915 100644 --- a/_delphi_utils_python/delphi_utils/validator/report.py +++ b/_delphi_utils_python/delphi_utils/validator/report.py @@ -35,6 +35,8 @@ def __init__(self, errors_to_suppress: List[ValidationFailure], Warnings raised from validation execution unsuppressed_errors: List[Exception] Errors raised from validation failures not found in `self.errors_to_suppress` + elapsed_time_in_seconds: float + Elapsed time of validation run, rounded down """ self.errors_to_suppress = errors_to_suppress self.data_source = data_source @@ -44,6 +46,7 @@ def __init__(self, errors_to_suppress: List[ValidationFailure], self.raised_warnings = [] self.unsuppressed_errors = [] self.dry_run = dry_run + self.elapsed_time_in_seconds = -1 # pylint: enable=R0902 def add_raised_error(self, error): @@ -68,6 +71,10 @@ def increment_total_checks(self): """Record a check.""" self.total_checks += 1 + def set_elapsed_time_in_seconds(self, time): + """Set elapsed runtime in seconds for later logging.""" + self.elapsed_time_in_seconds = time + def add_raised_warning(self, warning): """Add a warning to the report. @@ -94,7 +101,8 @@ def log(self, logger=None): checks_failed = len(self.unsuppressed_errors), checks_suppressed = self.num_suppressed, warnings = len(self.raised_warnings), - phase = "validation") + phase = "validation", + elapsed_time_in_seconds=self.elapsed_time_in_seconds) else: logger.info("Validation run unsuccessful", data_source = self.data_source, @@ -102,7 +110,8 @@ def log(self, logger=None): checks_failed = len(self.unsuppressed_errors), checks_suppressed = self.num_suppressed, warnings = len(self.raised_warnings), - phase="validation") + phase="validation", + elapsed_time_in_seconds=self.elapsed_time_in_seconds) # Threshold for slack alerts if warnings are excessive, # Currently extremely strict, set by observation of 1 month's logs excessive_warnings = self.total_checks > 0 and \ diff --git a/_delphi_utils_python/delphi_utils/validator/static.py b/_delphi_utils_python/delphi_utils/validator/static.py index d4449b27b..476cdc5e0 100644 --- a/_delphi_utils_python/delphi_utils/validator/static.py +++ b/_delphi_utils_python/delphi_utils/validator/static.py @@ -178,8 +178,8 @@ def check_bad_geo_id_value(self, df_to_test, filename, geo_type, report): - report: ValidationReport; report where results are added """ valid_geos = self._get_valid_geo_values(geo_type) - unexpected_geos = [geo for geo in df_to_test['geo_id'] - if geo.lower() not in valid_geos] + unexpected_geos = {geo for geo in df_to_test['geo_id'] + if geo.lower() not in valid_geos} if len(unexpected_geos) > 0: report.add_raised_error( ValidationFailure( @@ -187,8 +187,8 @@ def check_bad_geo_id_value(self, df_to_test, filename, geo_type, report): filename=filename, message=f"Unrecognized geo_ids (not in historical data) {unexpected_geos}")) report.increment_total_checks() - upper_case_geos = [ - geo for geo in df_to_test['geo_id'] if geo.lower() != geo] + upper_case_geos = { + geo for geo in df_to_test['geo_id'] if geo.lower() != geo} if len(upper_case_geos) > 0: report.add_raised_warning( ValidationFailure( @@ -218,8 +218,8 @@ def find_all_unexpected_geo_ids(df_to_test, geo_regex, geo_type): if geo_type in numeric_geo_types: # Check if geo_ids were stored as floats (contain decimal point) and # contents before decimal match the specified regex pattern. - leftover = [geo[1] for geo in df_to_test["geo_id"].str.split( - ".") if len(geo) > 1 and re.match(geo_regex, geo[0])] + leftover = {geo[1] for geo in df_to_test["geo_id"].str.split( + ".") if len(geo) > 1 and re.match(geo_regex, geo[0])} # If any floats found, remove decimal and anything after. if len(leftover) > 0: @@ -230,7 +230,7 @@ def find_all_unexpected_geo_ids(df_to_test, geo_regex, geo_type): ValidationFailure( "check_geo_id_type", filename=nameformat, - message="geo_ids saved as floats; strings preferred")) + message=f"geo_ids saved as floats; strings preferred: {leftover}")) if geo_type in fill_len.keys(): # Left-pad with zeroes up to expected length. Fixes missing leading zeroes @@ -281,29 +281,35 @@ def check_bad_val(self, df_to_test, nameformat, signal_type, report): if percent_option: if not df_to_test[(df_to_test['val'] > 100)].empty: + bad_values = df_to_test[(df_to_test['val'] > 100)]['val'].unique() report.add_raised_error( ValidationFailure( "check_val_pct_gt_100", filename=nameformat, - message="val column can't have any cell greater than 100 for percents")) + message="val column can't have any cell greater than 100 for percents; " + f"invalid values: {bad_values}")) report.increment_total_checks() if proportion_option: if not df_to_test[(df_to_test['val'] > 100000)].empty: + bad_values = df_to_test[(df_to_test['val'] > 100000)]['val'].unique() report.add_raised_error( ValidationFailure("check_val_prop_gt_100k", filename=nameformat, message="val column can't have any cell greater than 100000 " - "for proportions")) + "for proportions; " + f"invalid values: {bad_values}")) report.increment_total_checks() if not df_to_test[(df_to_test['val'] < 0)].empty: + bad_values = df_to_test[(df_to_test['val'] < 0)]['val'].unique() report.add_raised_error( ValidationFailure("check_val_lt_0", filename=nameformat, - message="val column can't have any cell smaller than 0")) + message="val column can't have any cell smaller than 0; " + f"invalid values: {bad_values}")) report.increment_total_checks() @@ -346,10 +352,12 @@ def check_bad_se(self, df_to_test, nameformat, report): report.increment_total_checks() if df_to_test["se"].isnull().mean() > 0.5: + bad_mean = round(df_to_test["se"].isnull().mean() * 100, 2) report.add_raised_error( ValidationFailure("check_se_many_missing", filename=nameformat, - message='Recent se values are >50% NA')) + message='Recent se values are >50% NA: ' + f'{bad_mean}%')) report.increment_total_checks() diff --git a/_delphi_utils_python/delphi_utils/validator/validate.py b/_delphi_utils_python/delphi_utils/validator/validate.py index 131aa93dc..2ba831f3f 100644 --- a/_delphi_utils_python/delphi_utils/validator/validate.py +++ b/_delphi_utils_python/delphi_utils/validator/validate.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- """Tools to validate CSV source data, including various check methods.""" +import time from .datafetcher import load_all_files from .dynamic import DynamicValidator from .errors import ValidationFailure @@ -54,6 +55,7 @@ def validate(self): Returns: - ValidationReport collating the validation outcomes """ + start_time = time.time() report = ValidationReport(self.suppressed_errors, self.data_source, self.dry_run) frames_list = load_all_files(self.export_dir, self.time_window.start_date, self.time_window.end_date) @@ -61,4 +63,5 @@ def validate(self): # Dynamic Validation only performed when frames_list is populated if len(frames_list) > 0: self.dynamic_validation.validate(aggregate_frames(frames_list), report) + report.set_elapsed_time_in_seconds(round(time.time() - start_time, 2)) return report diff --git a/_delphi_utils_python/setup.py b/_delphi_utils_python/setup.py index 2086bc3f3..b3aa86358 100644 --- a/_delphi_utils_python/setup.py +++ b/_delphi_utils_python/setup.py @@ -27,7 +27,7 @@ setup( name="delphi_utils", - version="0.3.20", + version="0.3.21", description="Shared Utility Functions for Indicators", long_description=long_description, long_description_content_type="text/markdown", diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index 11faffa3d..cb5b42a4b 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -162,6 +162,8 @@ def geo_reindex(self, data): date_col=Config.DATE_COL) # this line should be removed once the fix is implemented for megacounties data_frame = data_frame[~((data_frame['county'].str.len() > 5) | (data_frame['county'].str.contains('_')))] + # handle rogue \N: + data_frame = data_frame[data_frame['county'] != r'\N'] elif geo == "state": data_frame = gmpr.replace_geocode(data, "fips", "state_id", new_col="state", date_col=Config.DATE_COL) diff --git a/changehc/tests/test_update_sensor.py b/changehc/tests/test_update_sensor.py index 999fed7e8..7ef25a608 100644 --- a/changehc/tests/test_update_sensor.py +++ b/changehc/tests/test_update_sensor.py @@ -89,9 +89,14 @@ def test_geo_reindex(self): "fips": ['01001'] * 7 + ['04007'] * 6, "den": [1000] * 7 + [2000] * 6, "timestamp": [pd.Timestamp(f'03-{i}-2020') for i in range(1, 14)]}) + if geo == "county": # test for rogue \N + row_contain_N = {"num": 700, "fips": r"\N", "den": 2000, "timestamp": pd.Timestamp("03-15-2020")} + test_data = test_data.append(row_contain_N, ignore_index=True) data_frame = su_inst.geo_reindex(test_data) assert data_frame.shape[0] == multiple*len(su_inst.fit_dates) assert (data_frame.sum(numeric_only=True) == (4200,19000)).all() + if geo == "county": + assert r'\N' not in data_frame.index.get_level_values('county') def test_update_sensor(self): """Tests that the sensors are properly updated.""" diff --git a/changehc/version.cfg b/changehc/version.cfg index 8ff601e55..047910754 100644 --- a/changehc/version.cfg +++ b/changehc/version.cfg @@ -1 +1 @@ -current_version = 0.3.46 +current_version = 0.3.47 diff --git a/claims_hosp/version.cfg b/claims_hosp/version.cfg index 8ff601e55..047910754 100644 --- a/claims_hosp/version.cfg +++ b/claims_hosp/version.cfg @@ -1 +1 @@ -current_version = 0.3.46 +current_version = 0.3.47 diff --git a/doctor_visits/version.cfg b/doctor_visits/version.cfg index 8ff601e55..047910754 100644 --- a/doctor_visits/version.cfg +++ b/doctor_visits/version.cfg @@ -1 +1 @@ -current_version = 0.3.46 +current_version = 0.3.47 diff --git a/google_symptoms/version.cfg b/google_symptoms/version.cfg index 8ff601e55..047910754 100644 --- a/google_symptoms/version.cfg +++ b/google_symptoms/version.cfg @@ -1 +1 @@ -current_version = 0.3.46 +current_version = 0.3.47 diff --git a/hhs_hosp/version.cfg b/hhs_hosp/version.cfg index 8ff601e55..047910754 100644 --- a/hhs_hosp/version.cfg +++ b/hhs_hosp/version.cfg @@ -1 +1 @@ -current_version = 0.3.46 +current_version = 0.3.47 diff --git a/nchs_mortality/version.cfg b/nchs_mortality/version.cfg index 8ff601e55..047910754 100644 --- a/nchs_mortality/version.cfg +++ b/nchs_mortality/version.cfg @@ -1 +1 @@ -current_version = 0.3.46 +current_version = 0.3.47 diff --git a/nowcast/version.cfg b/nowcast/version.cfg index 8ff601e55..047910754 100644 --- a/nowcast/version.cfg +++ b/nowcast/version.cfg @@ -1 +1 @@ -current_version = 0.3.46 +current_version = 0.3.47 diff --git a/quidel_covidtest/delphi_quidel_covidtest/pull.py b/quidel_covidtest/delphi_quidel_covidtest/pull.py index 84ae8742e..d9f23f2ec 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/pull.py +++ b/quidel_covidtest/delphi_quidel_covidtest/pull.py @@ -2,6 +2,7 @@ """Collect and process Quidel export files.""" from os.path import join import os +import time from datetime import datetime, timedelta import boto3 @@ -369,7 +370,7 @@ def check_export_start_date(export_start_date, export_end_date, return datetime(2020, 5, 26) return export_start_date -def update_cache_file(df, _end_date, cache_dir): +def update_cache_file(df, _end_date, cache_dir, logger): """ Update cache file. Remove the old one, export the new one. @@ -380,8 +381,14 @@ def update_cache_file(df, _end_date, cache_dir): The most recent date when the raw data is received cache_dir: ./cache where the cache file is stored + logger: logging.Logger + Structured logger. """ + start_time = time.time() for fn in os.listdir(cache_dir): if ".csv" in fn: os.remove(join(cache_dir, fn)) df.to_csv(join(cache_dir, "pulled_until_%s.csv") % _end_date.strftime("%Y%m%d"), index=False) + logger.info("Completed cache file update", + end_date = _end_date.strftime('%Y-%m-%d'), + elapsed_time_in_seconds = round(time.time() - start_time, 2)) diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index 3f9451d22..a59e0c101 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -121,6 +121,7 @@ def run_module(params: Dict[str, Any]): __name__, filename=params["common"].get("log_filename"), log_exceptions=params["common"].get("log_exceptions", True)) stats = [] + # Log at program exit in case of an exception, otherwise after successful completion atexit.register(log_exit, start_time, stats, logger) cache_dir = params["indicator"]["input_cache_dir"] export_dir = params["common"]["export_dir"] @@ -223,4 +224,7 @@ def run_module(params: Dict[str, Any]): # Export the cache file if the pipeline runs successfully. # Otherwise, don't update the cache file - update_cache_file(df, _end_date, cache_dir) + update_cache_file(df, _end_date, cache_dir, logger) + # Log stats now instead of at program exit + atexit.unregister(log_exit) + log_exit(start_time, stats, logger) diff --git a/quidel_covidtest/version.cfg b/quidel_covidtest/version.cfg index 8ff601e55..047910754 100644 --- a/quidel_covidtest/version.cfg +++ b/quidel_covidtest/version.cfg @@ -1 +1 @@ -current_version = 0.3.46 +current_version = 0.3.47 diff --git a/sir_complainsalot/version.cfg b/sir_complainsalot/version.cfg index 8ff601e55..047910754 100644 --- a/sir_complainsalot/version.cfg +++ b/sir_complainsalot/version.cfg @@ -1 +1 @@ -current_version = 0.3.46 +current_version = 0.3.47