From 0ed5e018d69fd65a620779c946cd7a011c9c6f51 Mon Sep 17 00:00:00 2001 From: Jeff Laiosa Date: Mon, 20 Apr 2020 10:20:59 -0400 Subject: [PATCH] Begin work for caching anomalies and sending alerts --- app/check_dataset.py | 49 +++++++++++++++++++++++++++++++++++++--- app/checks.py | 30 ++++++++++++------------ resources/exceptions.csv | 21 +++++++++++++++++ 3 files changed, 81 insertions(+), 19 deletions(-) create mode 100644 resources/exceptions.csv diff --git a/app/check_dataset.py b/app/check_dataset.py index 928ea7e..7c55369 100644 --- a/app/check_dataset.py +++ b/app/check_dataset.py @@ -7,6 +7,7 @@ from datetime import datetime from argparse import ArgumentParser, Namespace, RawDescriptionHelpFormatter from datetime import timedelta +from operator import itemgetter import app.checks as checks from .qc_config import QCConfig @@ -179,6 +180,24 @@ def check_current(ds: DataSource, config: QCConfig) -> ResultLog: log.consolidate() return log +def find_in_exceptions(row, df): + """ + Find a row in a dataframe matching on state, date and exception_type + """ + state, date, exception_type = itemgetter('state', 'date', 'exception_type')(row) + found = df.loc[(df['state'] == state) & (df['date'] == date) & (df['exception_type'] == exception_type)] + return (False, True)[found.size < 1] + +def send_anomaly_info(anomalies): + """ + Warn user on found anomalies with instructions on how + to add the anomaly to list of exceptions + """ + for a in anomalies: + state, date, exception_type = itemgetter('state', 'date', 'exception_type')(a) + print(f'Found anomaly in {state} on {date} for type {exception_type}') + print('You should verify this with Data Entry or QA team, \nthen enter it into the list of allowed exceptions: "./resources/exceptions.csv"') + print(f'{state},{date.strftime("%Y-%m-%d")},{exception_type}') def check_history(ds: DataSource) -> ResultLog: """ @@ -187,15 +206,39 @@ def check_history(ds: DataSource) -> ResultLog: log = ResultLog() + exceptions_cached = pd.read_csv('./resources/exceptions.csv', parse_dates=['date']) + df = ds.history if is_missing(df): log.internal("Source", "History not available") return None - for state in df["state"].drop_duplicates().values: - state_df = df.loc[df["state"] == state] - checks.monotonically_increasing(state_df, log) + for state in df["state"].unique(): + exceptions = checks.monotonically_increasing(df.loc[df["state"] == state], state, log) + if (len(exceptions) > 0): + not_cached = list(filter(lambda x: find_in_exceptions(x, exceptions_cached), exceptions)) + if (len(not_cached) > 0): + send_anomaly_info(not_cached) log.consolidate() return log +def cache_exceptions(): + """ + Check the history and create a new exceptions file + """ + + df = ds.history + if is_missing(df): + log.internal("Source", "History not available") + return None + + # For saving + exceptions_df = pd.DataFrame(np.empty((0, 3)), columns=['state', 'date', 'exception_type']) + + for state in df["state"].unique(): + exceptions = checks.monotonically_increasing(df.loc[df["state"] == state], state, log) + if (len(exceptions) > 0): + exceptions_df = exceptions_df.append(exceptions, ignore_index=True) + + exceptions_df.to_csv('./resources/exceptions.csv', index=False) \ No newline at end of file diff --git a/app/checks.py b/app/checks.py index d6696df..49db66b 100644 --- a/app/checks.py +++ b/app/checks.py @@ -527,33 +527,31 @@ def increasing_values(row, df: pd.DataFrame, log: ResultLog, config: QCConfig = # ---------------------------------------------------------------- -def monotonically_increasing(df: pd.DataFrame, log: ResultLog): +def monotonically_increasing(df: pd.DataFrame, state: str, log: ResultLog): """Check that timeseries values are monotonically increasing Input is expected to be the values for a single state """ - columns_to_check = ["positive", "negative","hospitalized", "death"] - - state = df["state"].min() - if state != df["state"].max(): + + if (len(df['state'].unique()) > 1): raise Exception("Expected input to be for a single state") + + df = df.sort_values(by="date", ascending=True) + df['date']= pd.to_datetime(df['date'].astype('str'), format="%Y%m%d") - # TODO: don't group on state -- this is already filtered to a single state - df = df.sort_values(["state", "date"], ascending=True) - df_lagged = df.groupby("state")[columns_to_check] \ - .shift(1) \ - .rename(columns=lambda c: c+"_lag") - - df_comparison = df.merge(df_lagged, left_index=True, right_index=True, how="left") + exceptions = [] # check that all the counts are >= the previous day for col in columns_to_check: - if (df_comparison[f"{col}_lag"] > df_comparison[col]).any(): - error_dates = df_comparison.loc[(df_comparison[f"{col}_lag"] > df_comparison[col])]["date"] - error_dates_str = error_dates.astype(str).str.cat(sep=", ") - + if (df[col].is_monotonic == False): + dates = df.loc[df[col].diff() < 0, 'date'] + error_dates_str = dates.dt.strftime('%Y-%m-%d').values log.data_quality(state, f"{col} values decreased from the previous day (on {error_dates_str})") + for date in dates: + exceptions.append({ 'state': state, 'date': date, 'exception_type': col }) + + return exceptions # ---------------------------------------------------------------- diff --git a/resources/exceptions.csv b/resources/exceptions.csv new file mode 100644 index 0000000..4c0f9da --- /dev/null +++ b/resources/exceptions.csv @@ -0,0 +1,21 @@ +state,date,exception_type +AK,2020-04-13,negative +AR,2020-04-08,hospitalized +AR,2020-04-16,hospitalized +DC,2020-04-15,negative +DE,2020-04-11,negative +HI,2020-03-26,hospitalized +IA,2020-04-09,hospitalized +IN,2020-03-26,hospitalized +MA,2020-04-18,hospitalized +MS,2020-03-30,hospitalized +MS,2020-04-07,hospitalized +PA,2020-04-07,hospitalized +RI,2020-03-07,positive +SC,2020-04-03,hospitalized +VA,2020-04-11,hospitalized +VA,2020-04-05,death +VT,2020-04-09,hospitalized +WI,2020-03-30,negative +WV,2020-04-04,hospitalized +WV,2020-04-16,hospitalized