Skip to content
This repository was archived by the owner on Sep 23, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions app/check_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime
from argparse import ArgumentParser, Namespace, RawDescriptionHelpFormatter
from datetime import timedelta
from operator import itemgetter

import app.checks as checks
from .qc_config import QCConfig
Expand Down Expand Up @@ -179,6 +180,24 @@ def check_current(ds: DataSource, config: QCConfig) -> ResultLog:
log.consolidate()
return log

def find_in_exceptions(row, df):
"""
Find a row in a dataframe matching on state, date and exception_type
"""
state, date, exception_type = itemgetter('state', 'date', 'exception_type')(row)
found = df.loc[(df['state'] == state) & (df['date'] == date) & (df['exception_type'] == exception_type)]
return (False, True)[found.size < 1]

def send_anomaly_info(anomalies):
"""
Warn user on found anomalies with instructions on how
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is also where we could hook in some notifications. I'll have to dig into the slack API, I don't want to hammer the room with notifications.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the place to start is a once-per-day notification based on either current or the history table that runs at around 5PM ET (after the 2nd shift). You should be able to render the log to a text string and attach it as a snippet to a slack message so it doesn't take up a lot of screen space unless you unroll it.

You should span a background thread in either the flask App or the Pyro4 app that sends a message on startup then sleeps until 5PM ET.

to add the anomaly to list of exceptions
"""
for a in anomalies:
state, date, exception_type = itemgetter('state', 'date', 'exception_type')(a)
print(f'Found anomaly in {state} on {date} for type {exception_type}')
print('You should verify this with Data Entry or QA team, \nthen enter it into the list of allowed exceptions: "./resources/exceptions.csv"')
print(f'{state},{date.strftime("%Y-%m-%d")},{exception_type}')

def check_history(ds: DataSource) -> ResultLog:
"""
Expand All @@ -187,15 +206,39 @@ def check_history(ds: DataSource) -> ResultLog:

log = ResultLog()

exceptions_cached = pd.read_csv('./resources/exceptions.csv', parse_dates=['date'])
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right place for exceptions? Maybe we have an anomalies directory and then we track these by type (state, us, etc).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah, because it is under the source tree and we don't want it in revision control (if think).

Can you add a log dir to the .ini file, create the dir if it is missing, and default it to ~/logs/exceptions.csv?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking we would want it under source control. This way, we can detect when a new anomaly is detected and we can alert the channel. If an anomaly is valid, we simply add it to the cached file so the room never gets alerted on that anomaly again. These would be fairly small files -- the current exceptions has 20 exceptions and is < 1KB is size.


df = ds.history
if is_missing(df):
log.internal("Source", "History not available")
return None

for state in df["state"].drop_duplicates().values:
state_df = df.loc[df["state"] == state]
checks.monotonically_increasing(state_df, log)
for state in df["state"].unique():
exceptions = checks.monotonically_increasing(df.loc[df["state"] == state], state, log)
if (len(exceptions) > 0):
not_cached = list(filter(lambda x: find_in_exceptions(x, exceptions_cached), exceptions))
if (len(not_cached) > 0):
send_anomaly_info(not_cached)

log.consolidate()
return log

def cache_exceptions():
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may belong higher up. The idea is we have a place to go to build the anomaly cache. This would not get run very often I don't think. It duplicates some code in check_history

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it belongs a level up and it should call the current/history versions. anything in those lists are/should be exceptions.

"""
Check the history and create a new exceptions file
"""

df = ds.history
if is_missing(df):
log.internal("Source", "History not available")
return None

# For saving
exceptions_df = pd.DataFrame(np.empty((0, 3)), columns=['state', 'date', 'exception_type'])

for state in df["state"].unique():
exceptions = checks.monotonically_increasing(df.loc[df["state"] == state], state, log)
if (len(exceptions) > 0):
exceptions_df = exceptions_df.append(exceptions, ignore_index=True)

exceptions_df.to_csv('./resources/exceptions.csv', index=False)
30 changes: 14 additions & 16 deletions app/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,33 +527,31 @@ def increasing_values(row, df: pd.DataFrame, log: ResultLog, config: QCConfig =

# ----------------------------------------------------------------

def monotonically_increasing(df: pd.DataFrame, log: ResultLog):
def monotonically_increasing(df: pd.DataFrame, state: str, log: ResultLog):
"""Check that timeseries values are monotonically increasing

Input is expected to be the values for a single state
"""

columns_to_check = ["positive", "negative","hospitalized", "death"]

state = df["state"].min()
if state != df["state"].max():

if (len(df['state'].unique()) > 1):
raise Exception("Expected input to be for a single state")

df = df.sort_values(by="date", ascending=True)
df['date']= pd.to_datetime(df['date'].astype('str'), format="%Y%m%d")

# TODO: don't group on state -- this is already filtered to a single state
df = df.sort_values(["state", "date"], ascending=True)
df_lagged = df.groupby("state")[columns_to_check] \
.shift(1) \
.rename(columns=lambda c: c+"_lag")

df_comparison = df.merge(df_lagged, left_index=True, right_index=True, how="left")
exceptions = []

# check that all the counts are >= the previous day
for col in columns_to_check:
if (df_comparison[f"{col}_lag"] > df_comparison[col]).any():
error_dates = df_comparison.loc[(df_comparison[f"{col}_lag"] > df_comparison[col])]["date"]
error_dates_str = error_dates.astype(str).str.cat(sep=", ")

if (df[col].is_monotonic == False):
dates = df.loc[df[col].diff() < 0, 'date']
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored this because initially, I thought there was a bug. There wasn't. But, I did find a shorter way to do the checking and comparison with the pandas is_monotonic/diff() prop/function.

error_dates_str = dates.dt.strftime('%Y-%m-%d').values
log.data_quality(state, f"{col} values decreased from the previous day (on {error_dates_str})")
for date in dates:
exceptions.append({ 'state': state, 'date': date, 'exception_type': col })

return exceptions

# ----------------------------------------------------------------

Expand Down
21 changes: 21 additions & 0 deletions resources/exceptions.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
state,date,exception_type
AK,2020-04-13,negative
AR,2020-04-08,hospitalized
AR,2020-04-16,hospitalized
DC,2020-04-15,negative
DE,2020-04-11,negative
HI,2020-03-26,hospitalized
IA,2020-04-09,hospitalized
IN,2020-03-26,hospitalized
MA,2020-04-18,hospitalized
MS,2020-03-30,hospitalized
MS,2020-04-07,hospitalized
PA,2020-04-07,hospitalized
RI,2020-03-07,positive
SC,2020-04-03,hospitalized
VA,2020-04-11,hospitalized
VA,2020-04-05,death
VT,2020-04-09,hospitalized
WI,2020-03-30,negative
WV,2020-04-04,hospitalized
WV,2020-04-16,hospitalized