-
Notifications
You must be signed in to change notification settings - Fork 5
Begin work for caching anomalies and sending alerts #6
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |
| from datetime import datetime | ||
| from argparse import ArgumentParser, Namespace, RawDescriptionHelpFormatter | ||
| from datetime import timedelta | ||
| from operator import itemgetter | ||
|
|
||
| import app.checks as checks | ||
| from .qc_config import QCConfig | ||
|
|
@@ -179,6 +180,24 @@ def check_current(ds: DataSource, config: QCConfig) -> ResultLog: | |
| log.consolidate() | ||
| return log | ||
|
|
||
| def find_in_exceptions(row, df): | ||
| """ | ||
| Find a row in a dataframe matching on state, date and exception_type | ||
| """ | ||
| state, date, exception_type = itemgetter('state', 'date', 'exception_type')(row) | ||
| found = df.loc[(df['state'] == state) & (df['date'] == date) & (df['exception_type'] == exception_type)] | ||
| return (False, True)[found.size < 1] | ||
|
|
||
| def send_anomaly_info(anomalies): | ||
| """ | ||
| Warn user on found anomalies with instructions on how | ||
| to add the anomaly to list of exceptions | ||
| """ | ||
| for a in anomalies: | ||
| state, date, exception_type = itemgetter('state', 'date', 'exception_type')(a) | ||
| print(f'Found anomaly in {state} on {date} for type {exception_type}') | ||
| print('You should verify this with Data Entry or QA team, \nthen enter it into the list of allowed exceptions: "./resources/exceptions.csv"') | ||
| print(f'{state},{date.strftime("%Y-%m-%d")},{exception_type}') | ||
|
|
||
| def check_history(ds: DataSource) -> ResultLog: | ||
| """ | ||
|
|
@@ -187,15 +206,39 @@ def check_history(ds: DataSource) -> ResultLog: | |
|
|
||
| log = ResultLog() | ||
|
|
||
| exceptions_cached = pd.read_csv('./resources/exceptions.csv', parse_dates=['date']) | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the right place for exceptions? Maybe we have an anomalies directory and then we track these by type (state, us, etc).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nah, because it is under the source tree and we don't want it in revision control (if think). Can you add a log dir to the .ini file, create the dir if it is missing, and default it to ~/logs/exceptions.csv?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking we would want it under source control. This way, we can detect when a new anomaly is detected and we can alert the channel. If an anomaly is valid, we simply add it to the cached file so the room never gets alerted on that anomaly again. These would be fairly small files -- the current exceptions has 20 exceptions and is < 1KB is size. |
||
|
|
||
| df = ds.history | ||
| if is_missing(df): | ||
| log.internal("Source", "History not available") | ||
| return None | ||
|
|
||
| for state in df["state"].drop_duplicates().values: | ||
| state_df = df.loc[df["state"] == state] | ||
| checks.monotonically_increasing(state_df, log) | ||
| for state in df["state"].unique(): | ||
| exceptions = checks.monotonically_increasing(df.loc[df["state"] == state], state, log) | ||
| if (len(exceptions) > 0): | ||
| not_cached = list(filter(lambda x: find_in_exceptions(x, exceptions_cached), exceptions)) | ||
| if (len(not_cached) > 0): | ||
| send_anomaly_info(not_cached) | ||
|
|
||
| log.consolidate() | ||
| return log | ||
|
|
||
| def cache_exceptions(): | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This may belong higher up. The idea is we have a place to go to build the anomaly cache. This would not get run very often I don't think. It duplicates some code in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it belongs a level up and it should call the current/history versions. anything in those lists are/should be exceptions. |
||
| """ | ||
| Check the history and create a new exceptions file | ||
| """ | ||
|
|
||
| df = ds.history | ||
| if is_missing(df): | ||
| log.internal("Source", "History not available") | ||
| return None | ||
|
|
||
| # For saving | ||
| exceptions_df = pd.DataFrame(np.empty((0, 3)), columns=['state', 'date', 'exception_type']) | ||
|
|
||
| for state in df["state"].unique(): | ||
| exceptions = checks.monotonically_increasing(df.loc[df["state"] == state], state, log) | ||
| if (len(exceptions) > 0): | ||
| exceptions_df = exceptions_df.append(exceptions, ignore_index=True) | ||
|
|
||
| exceptions_df.to_csv('./resources/exceptions.csv', index=False) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -527,33 +527,31 @@ def increasing_values(row, df: pd.DataFrame, log: ResultLog, config: QCConfig = | |
|
|
||
| # ---------------------------------------------------------------- | ||
|
|
||
| def monotonically_increasing(df: pd.DataFrame, log: ResultLog): | ||
| def monotonically_increasing(df: pd.DataFrame, state: str, log: ResultLog): | ||
| """Check that timeseries values are monotonically increasing | ||
|
|
||
| Input is expected to be the values for a single state | ||
| """ | ||
|
|
||
| columns_to_check = ["positive", "negative","hospitalized", "death"] | ||
|
|
||
| state = df["state"].min() | ||
| if state != df["state"].max(): | ||
|
|
||
| if (len(df['state'].unique()) > 1): | ||
| raise Exception("Expected input to be for a single state") | ||
|
|
||
| df = df.sort_values(by="date", ascending=True) | ||
| df['date']= pd.to_datetime(df['date'].astype('str'), format="%Y%m%d") | ||
|
|
||
| # TODO: don't group on state -- this is already filtered to a single state | ||
| df = df.sort_values(["state", "date"], ascending=True) | ||
| df_lagged = df.groupby("state")[columns_to_check] \ | ||
| .shift(1) \ | ||
| .rename(columns=lambda c: c+"_lag") | ||
|
|
||
| df_comparison = df.merge(df_lagged, left_index=True, right_index=True, how="left") | ||
| exceptions = [] | ||
|
|
||
| # check that all the counts are >= the previous day | ||
| for col in columns_to_check: | ||
| if (df_comparison[f"{col}_lag"] > df_comparison[col]).any(): | ||
| error_dates = df_comparison.loc[(df_comparison[f"{col}_lag"] > df_comparison[col])]["date"] | ||
| error_dates_str = error_dates.astype(str).str.cat(sep=", ") | ||
|
|
||
| if (df[col].is_monotonic == False): | ||
| dates = df.loc[df[col].diff() < 0, 'date'] | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I refactored this because initially, I thought there was a bug. There wasn't. But, I did find a shorter way to do the checking and comparison with the pandas |
||
| error_dates_str = dates.dt.strftime('%Y-%m-%d').values | ||
| log.data_quality(state, f"{col} values decreased from the previous day (on {error_dates_str})") | ||
| for date in dates: | ||
| exceptions.append({ 'state': state, 'date': date, 'exception_type': col }) | ||
|
|
||
| return exceptions | ||
|
|
||
| # ---------------------------------------------------------------- | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| state,date,exception_type | ||
| AK,2020-04-13,negative | ||
| AR,2020-04-08,hospitalized | ||
| AR,2020-04-16,hospitalized | ||
| DC,2020-04-15,negative | ||
| DE,2020-04-11,negative | ||
| HI,2020-03-26,hospitalized | ||
| IA,2020-04-09,hospitalized | ||
| IN,2020-03-26,hospitalized | ||
| MA,2020-04-18,hospitalized | ||
| MS,2020-03-30,hospitalized | ||
| MS,2020-04-07,hospitalized | ||
| PA,2020-04-07,hospitalized | ||
| RI,2020-03-07,positive | ||
| SC,2020-04-03,hospitalized | ||
| VA,2020-04-11,hospitalized | ||
| VA,2020-04-05,death | ||
| VT,2020-04-09,hospitalized | ||
| WI,2020-03-30,negative | ||
| WV,2020-04-04,hospitalized | ||
| WV,2020-04-16,hospitalized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is also where we could hook in some notifications. I'll have to dig into the slack API, I don't want to hammer the room with notifications.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the place to start is a once-per-day notification based on either current or the history table that runs at around 5PM ET (after the 2nd shift). You should be able to render the log to a text string and attach it as a snippet to a slack message so it doesn't take up a lot of screen space unless you unroll it.
You should span a background thread in either the flask App or the Pyro4 app that sends a message on startup then sleeps until 5PM ET.