From f37211b7a830bd720801c2bfbde6a13ee9f4397c Mon Sep 17 00:00:00 2001 From: Rostyslav Zatserkovnyi Date: Tue, 8 Aug 2023 09:25:42 +0300 Subject: [PATCH 1/9] Performance testing testing workflow (#1253) --- .../workflows/performance-tests-periodic.yml | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 .github/workflows/performance-tests-periodic.yml diff --git a/.github/workflows/performance-tests-periodic.yml b/.github/workflows/performance-tests-periodic.yml new file mode 100644 index 000000000..f013bd5e3 --- /dev/null +++ b/.github/workflows/performance-tests-periodic.yml @@ -0,0 +1,102 @@ +name: One-time performance testing - 9th August 2023 + +# Run "At every 30th minute on day-of-month 9 in August" +on: + schedule: + - cron: '*/30 * 9 8 *' + +# Add some extra perms to comment on a PR +permissions: + pull-requests: write + contents: read + +jobs: + run-perftests: + runs-on: ubuntu-latest + outputs: + request_count: ${{ steps.output.outputs.request_count }} + failure_count: ${{ steps.output.outputs.failure_count }} + med_time: ${{ steps.output.outputs.med_time }} + avg_time: ${{ steps.output.outputs.avg_time }} + min_time: ${{ steps.output.outputs.min_time }} + max_time: ${{ steps.output.outputs.max_time }} + requests_per_sec: ${{ steps.output.outputs.requests_per_sec }} + steps: + - name: Set up WireGuard + uses: egor-tensin/setup-wireguard@v1.2.0 + with: + endpoint: '${{ secrets.WG_PERF_ENDPOINT }}' + endpoint_public_key: '${{ secrets.WG_PERF_ENDPOINT_PUBLIC_KEY }}' + ips: '${{ secrets.WG_PERF_IPS }}' + allowed_ips: '${{ secrets.WG_PERF_ALLOWED_IPS }}' + private_key: '${{ secrets.WG_PERF_PRIVATE_KEY }}' + - name: Check out repository + uses: actions/checkout@v3 + - name: Set up repository # mimics install.sh in the README except that delphi is cloned from the PR rather than main + run: | + cd .. + mkdir -p driver/repos/delphi + cd driver/repos/delphi + git clone https://github.com/cmu-delphi/operations + git clone https://github.com/cmu-delphi/utils + git clone https://github.com/cmu-delphi/flu-contest + git clone https://github.com/cmu-delphi/nowcast + cd ../../ + + cd .. + cp -R delphi-epidata driver/repos/delphi/delphi-epidata + cd - + + ln -s repos/delphi/delphi-epidata/dev/local/Makefile + - name: Build & run epidata + run: | + cd ../driver + sudo make web sql="${{ secrets.DB_CONN_STRING }}" + sudo make redis + - name: Check out delphi-admin + uses: actions/checkout@v3 + with: + repository: cmu-delphi/delphi-admin + token: ${{ secrets.CMU_DELPHI_DEPLOY_MACHINE_PAT }} + path: delphi-admin + - name: Build & run Locust + continue-on-error: true # sometimes ~2-5 queries fail, we shouldn't end the run if that's the case + run: | + cd delphi-admin/load-testing/locust + docker build -t locust . + export CSV=v4-requests-small.csv + touch output_stats.csv && chmod 666 output_stats.csv + touch output_stats_history.csv && chmod 666 output_stats_history.csv + touch output_failures.csv && chmod 666 output_failures.csv + touch output_exceptions.csv && chmod 666 output_exceptions.csv + docker run --net=host -v $PWD:/mnt/locust -e CSV="/mnt/locust/${CSV}" locust -f /mnt/locust/v4.py --host http://127.0.0.1:10080/ --users 10 --spawn-rate 1 --headless -i "$(cat ${CSV} | wc -l)" --csv=/mnt/locust/output + - name: Produce output for summary + id: output + uses: jannekem/run-python-script-action@v1 + with: + script: | + import os + + def write_string(name, value): + with open(os.environ['GITHUB_OUTPUT'], 'a') as fh: + print(f'{name}={value}', file=fh) + + def write_float(name, value): + write_string(name, "{:.2f}".format(float(value))) + + with open("delphi-admin/load-testing/locust/output_stats.csv", "r", encoding="utf-8", errors="ignore") as scraped: + final_line = scraped.readlines()[-1].split(",") + write_string('request_count', final_line[2]) + write_string('failure_count', final_line[3]) + write_float('med_time', final_line[4]) + write_float('avg_time', final_line[5]) + write_float('min_time', final_line[6]) + write_float('max_time', final_line[7]) + write_float('requests_per_sec', final_line[9]) + + - name: Archive results as artifacts + uses: actions/upload-artifact@v3 + with: + name: locust-output + path: | + delphi-admin/load-testing/locust/output_*.csv From d3f683a927cb4173d90117a5eed9f48517f28253 Mon Sep 17 00:00:00 2001 From: Alex Reinhart Date: Thu, 10 Aug 2023 14:37:28 -0400 Subject: [PATCH 2/9] New CTIS publication (#1255) --- docs/symptom-survey/publications.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/symptom-survey/publications.md b/docs/symptom-survey/publications.md index d72277455..378ef5249 100644 --- a/docs/symptom-survey/publications.md +++ b/docs/symptom-survey/publications.md @@ -26,6 +26,9 @@ Pandemic"](https://www.pnas.org/topic/548) in *PNAS*: Research publications using the survey data include: +- Uyheng, J., Robertson, D.C. & Carley, K.M. (2023). [Bridging online and offline + dynamics of the face mask infodemic](https://doi.org/10.1186/s44247-023-00026-z). + *BMC Digital Health* 1, 27. - Kobayashi H, Saenz-Escarcega R, Fulk A, Agusto FB (2023). [Understanding mental health trends during COVID-19 pandemic in the United States using network analysis](https://doi.org/10.1371/journal.pone.0286857). *PLoS From e16094477f802346ccb667ddee907d1bae47e7df Mon Sep 17 00:00:00 2001 From: Alex Reinhart Date: Thu, 10 Aug 2023 14:48:30 -0400 Subject: [PATCH 3/9] Newer CTIS publication --- docs/symptom-survey/publications.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/symptom-survey/publications.md b/docs/symptom-survey/publications.md index 378ef5249..b03856e23 100644 --- a/docs/symptom-survey/publications.md +++ b/docs/symptom-survey/publications.md @@ -26,6 +26,10 @@ Pandemic"](https://www.pnas.org/topic/548) in *PNAS*: Research publications using the survey data include: +- M. Rubinstein, Z. Branson, and E.H. Kennedy (2023). [Heterogeneous + interventional effects with multiple mediators: Semiparametric and + nonparametric approaches](https://doi.org/10.1515/jci-2022-0070). *Journal of + Causal Inference* 11 (1), 20220070. - Uyheng, J., Robertson, D.C. & Carley, K.M. (2023). [Bridging online and offline dynamics of the face mask infodemic](https://doi.org/10.1186/s44247-023-00026-z). *BMC Digital Health* 1, 27. From 01365a11f3eb8ff6a0ba96254397b75d14726c73 Mon Sep 17 00:00:00 2001 From: melange396 Date: Mon, 14 Aug 2023 10:48:59 -0400 Subject: [PATCH 4/9] bring loggers in sync and add multiproc capabilities (#1254) --- src/common/logger.py | 190 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 172 insertions(+), 18 deletions(-) diff --git a/src/common/logger.py b/src/common/logger.py index 909e47fb7..d04ff7673 100644 --- a/src/common/logger.py +++ b/src/common/logger.py @@ -1,24 +1,50 @@ +"""Structured logger utility for creating JSON logs.""" + +# the Delphi group uses two ~identical versions of this file. +# try to keep them in sync with edits, for sanity. +# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py # pylint: disable=line-too-long +# https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py + +import contextlib import logging +import multiprocessing import os import sys import threading +from traceback import format_exception + import structlog def handle_exceptions(logger): """Handle exceptions using the provided logger.""" - def exception_handler(etype, value, traceback): - logger.exception("Top-level exception occurred", exc_info=(etype, value, traceback)) + def exception_handler(scope, etype, value, traceback): + logger.exception("Top-level exception occurred", + scope=scope, exc_info=(etype, value, traceback)) - def multithread_exception_handler(args): - exception_handler(args.exc_type, args.exc_value, args.exc_traceback) + def sys_exception_handler(etype, value, traceback): + exception_handler("sys", etype, value, traceback) - sys.excepthook = exception_handler - threading.excepthook = multithread_exception_handler + def threading_exception_handler(args): + if args.exc_type == SystemExit and args.exc_value.code == 0: + # `sys.exit(0)` is considered "successful termination": + # https://docs.python.org/3/library/sys.html#sys.exit + logger.debug("normal thread exit", thread=args.thread, + stack="".join( + format_exception( + args.exc_type, args.exc_value, args.exc_traceback))) + else: + exception_handler(f"thread: {args.thread}", + args.exc_type, args.exc_value, args.exc_traceback) + sys.excepthook = sys_exception_handler + threading.excepthook = threading_exception_handler -def get_structured_logger(name=__name__, filename=None, log_exceptions=True): + +def get_structured_logger(name=__name__, + filename=None, + log_exceptions=True): """Create a new structlog logger. Use the logger returned from this in indicator code using the standard @@ -38,22 +64,19 @@ def get_structured_logger(name=__name__, filename=None, log_exceptions=True): is a good choice. filename: An (optional) file to write log output. """ - # Configure the underlying logging configuration - handlers = [logging.StreamHandler()] - if filename: - handlers.append(logging.FileHandler(filename)) - + # Set the underlying logging configuration if "LOG_DEBUG" in os.environ: log_level = logging.DEBUG else: log_level = logging.INFO - logging.basicConfig(format="%(message)s", level=log_level, handlers=handlers) + logging.basicConfig( + format="%(message)s", + level=log_level, + handlers=[logging.StreamHandler()]) - def add_pid(logger, method_name, event_dict): - """ - Add current PID to the event dict. - """ + def add_pid(_logger, _method_name, event_dict): + """Add current PID to the event dict.""" event_dict["pid"] = os.getpid() return event_dict @@ -92,9 +115,140 @@ def add_pid(logger, method_name, event_dict): cache_logger_on_first_use=True, ) - logger = structlog.get_logger(name) + # Create the underlying python logger and wrap it with structlog + system_logger = logging.getLogger(name) + if filename and not system_logger.handlers: + system_logger.addHandler(logging.FileHandler(filename)) + system_logger.setLevel(log_level) + logger = structlog.wrap_logger(system_logger) if log_exceptions: handle_exceptions(logger) return logger + + +class LoggerThread(): + """ + A construct to use a logger from multiprocessing workers/jobs. + + the bare structlog loggers are thread-safe but not multiprocessing-safe. + a `LoggerThread` will spawn a thread that listens to a mp.Queue + and logs messages from it with the provided logger, + so other processes can send logging messages to it + via the logger-like `SubLogger` interface. + the SubLogger even logs the pid of the caller. + + this is good to use with a set of jobs that are part of a mp.Pool, + but isnt recommended for general use + because of overhead from threading and multiprocessing, + and because it might introduce lag to log messages. + + somewhat inspired by: + docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes + """ + + class SubLogger(): + """MP-safe logger-like interface to convey log messages to a listening LoggerThread.""" + + def __init__(self, queue): + """Create SubLogger with a bound queue.""" + self.queue = queue + + def _log(self, level, *args, **kwargs): + kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} + kwargs_plus.update(kwargs) + self.queue.put([level, args, kwargs_plus]) + + def debug(self, *args, **kwargs): + """Log a DEBUG level message.""" + self._log(logging.DEBUG, *args, **kwargs) + + def info(self, *args, **kwargs): + """Log an INFO level message.""" + self._log(logging.INFO, *args, **kwargs) + + def warning(self, *args, **kwargs): + """Log a WARNING level message.""" + self._log(logging.WARNING, *args, **kwargs) + + def error(self, *args, **kwargs): + """Log an ERROR level message.""" + self._log(logging.ERROR, *args, **kwargs) + + def critical(self, *args, **kwargs): + """Log a CRITICAL level message.""" + self._log(logging.CRITICAL, *args, **kwargs) + + + def get_sublogger(self): + """Retrieve SubLogger for this LoggerThread.""" + return self.sublogger + + def __init__(self, logger, q=None): + """Create and start LoggerThread with supplied logger, creating a queue if not provided.""" + self.logger = logger + if q: + self.msg_queue = q + else: + self.msg_queue = multiprocessing.Queue() + + def logger_thread_worker(): + logger.info('thread started') + while True: + msg = self.msg_queue.get() + if msg == 'STOP': + logger.debug('received stop signal') + break + level, args, kwargs = msg + if level in [logging.DEBUG, logging.INFO, logging.WARNING, + logging.ERROR, logging.CRITICAL]: + logger.log(level, *args, **kwargs) + else: + logger.error('received unknown logging level! exiting...', + level=level, args_kwargs=(args, kwargs)) + break + logger.debug('stopping thread') + + self.thread = threading.Thread(target=logger_thread_worker, + name="LoggerThread__"+logger.name) + logger.debug('starting thread') + self.thread.start() + + self.sublogger = LoggerThread.SubLogger(self.msg_queue) + self.running = True + + def stop(self): + """Terminate this LoggerThread.""" + if not self.running: + self.logger.warning('thread already stopped') + return + self.logger.debug('sending stop signal') + self.msg_queue.put('STOP') + self.thread.join() + self.running = False + self.logger.info('thread stopped') + + +@contextlib.contextmanager +def pool_and_threadedlogger(logger, *poolargs): + """ + Provide (to a context) a multiprocessing Pool and a proxy to the supplied logger. + + Emulates the multiprocessing.Pool() context manager, + but also provides (via a LoggerThread) a SubLogger proxy to logger + that can be safely used by pool workers. + The SubLogger proxy interface supports these methods: debug, info, warning, error, + and critical. + Also "cleans up" the pool by waiting for workers to complete + as it exits the context. + """ + with multiprocessing.Manager() as manager: + logger_thread = LoggerThread(logger, manager.Queue()) + try: + with multiprocessing.Pool(*poolargs) as pool: + yield pool, logger_thread.get_sublogger() + pool.close() + pool.join() + finally: + logger_thread.stop() From c199403b0b2e1c26d8462367d919f83deaa3bbe2 Mon Sep 17 00:00:00 2001 From: minhkhul <118945681+minhkhul@users.noreply.github.com> Date: Mon, 14 Aug 2023 12:58:23 -0400 Subject: [PATCH 5/9] add syntax feature documentation (#1256) --- docs/api/covidcast.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/docs/api/covidcast.md b/docs/api/covidcast.md index 245f6fe03..bf931fe4c 100644 --- a/docs/api/covidcast.md +++ b/docs/api/covidcast.md @@ -113,6 +113,16 @@ and lists. The current set of signals available for each data source is returned by the [`covidcast_meta`](covidcast_meta.md) endpoint. +#### Alternate Required Parameters + +The following parameters help specify multiple source-signal, timetype-timevalue or geotype-geovalue pairs. Use them instead of the usual required parameters. + +| Parameter | Replaces | Format | Description | Example | +| --- | --- | --- | --- | --- | +| `signal` | `data_source`, `signal` | `signal={source}:{signal1},{signal2}` | Specify multiple source-signal pairs, grouped by source | `signal=src1:sig1`, `signal=src1:sig1,sig2`, `signal=src1:*`, `signal=src1:sig1;src2:sig3` | +| `time` | `time_type`, `time_values` | `time={timetype}:{timevalue1},{timevalue2}` | Specify multiple timetype-timevalue pairs, grouped by timetype | `time=day:*`, `time=day:20201201`, `time=day:20201201,20201202`, `time=day:20201201-20201204` | +| `geo` | `geo_type`, `geo_value` | `geo={geotype}:{geovalue1},{geovalue2}` | Specify multiple geotype-geovalue pairs, grouped by geotype | `geo=fips:*`, `geo=fips:04019`, `geo=fips:04019,19143`, `geo=fips:04019;msa:40660`, `geo=fips:*;msa:*` | + #### Optional Estimates for a specific `time_value` and `geo_value` are sometimes updated @@ -209,6 +219,12 @@ The `fields` parameter can be used to limit which fields are included in each re https://api.delphi.cmu.edu/epidata/covidcast/?data_source=fb-survey&signal=smoothed_cli&time_type=day&geo_type=county&time_values=20200406-20200410&geo_value=06001 +or + +https://api.delphi.cmu.edu/epidata/covidcast/?signal=fb-survey:smoothed_cli&time=day:20200406-20200410&geo=county:06001 + +Both of these URLs are equivalent and can be used to get the following result: + ```json { "result": 1, From 508c1fd33caf19a08433c452e7878489a4e26be5 Mon Sep 17 00:00:00 2001 From: Alex Reinhart Date: Mon, 14 Aug 2023 19:07:07 -0400 Subject: [PATCH 6/9] Newest CTIS publication --- docs/symptom-survey/publications.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/symptom-survey/publications.md b/docs/symptom-survey/publications.md index b03856e23..06c1d59c7 100644 --- a/docs/symptom-survey/publications.md +++ b/docs/symptom-survey/publications.md @@ -26,6 +26,10 @@ Pandemic"](https://www.pnas.org/topic/548) in *PNAS*: Research publications using the survey data include: +- C.K. Ettman, E. Badillo Goicoechea, and E.A. Stuart (2023). [Evolution of + depression and anxiety over the COVID-19 pandemic and across demographic + groups in a large sample of U.S. adults](https://doi.org/10.1016/j.focus.2023.100140). + *AJPM Focus*. - M. Rubinstein, Z. Branson, and E.H. Kennedy (2023). [Heterogeneous interventional effects with multiple mediators: Semiparametric and nonparametric approaches](https://doi.org/10.1515/jci-2022-0070). *Journal of From 11d327bc3de513516d9e6ab6b235d60d998a152d Mon Sep 17 00:00:00 2001 From: minhkhul Date: Wed, 16 Aug 2023 21:17:17 +0000 Subject: [PATCH 7/9] chore: sync to www-covidcast release v3.2.7 --- .../covidcast_utils/descriptions.raw.txt | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/src/server/endpoints/covidcast_utils/descriptions.raw.txt b/src/server/endpoints/covidcast_utils/descriptions.raw.txt index 85cfb1365..e0d057aab 100644 --- a/src/server/endpoints/covidcast_utils/descriptions.raw.txt +++ b/src/server/endpoints/covidcast_utils/descriptions.raw.txt @@ -42,32 +42,6 @@ SignalTooltip: Percentage of daily doctor visits that are due to lab-confirmed i Description: Delphi receives aggregated statistics from Change Healthcare, Inc. on lab-confirmed influenza outpatient doctor visits, derived from ICD codes found in insurance claims. Using this data, we estimate the percentage of daily doctor’s visits in each area that resulted in a diagnosis of influenza. Note that these estimates are based only on visits by patients whose insurance claims are accessible to Change Healthcare. ---- -Name: COVID Antigen Test Positivity -Id: quidel-covid-ag -Signal: covid_ag_smoothed_pct_positive -Unit: per 100 tests - - -SignalTooltip: Positivity rate of COVID-19 antigen tests, based on data provided by Quidel, Inc. - - -Description: Delphi receives data from COVID-19 antigen tests conducted by Quidel, a national provider of networked lab testing devices (covering doctors' offices, clinics, and hospitals). Antigen tests can detect parts of the virus that are present during an active infection.We report the percentage of COVID antigen tests that are positive. Note that this signal only includes Quidel’s antigen tests, not those run by other test providers. - - -AgeStratifications: - - Name: 0-4 - Signal: covid_ag_smoothed_pct_positive_age_0_4 - - Name: 5-17 - Signal: covid_ag_smoothed_pct_positive_age_5_17 - - Name: 18-49 - Signal: covid_ag_smoothed_pct_positive_age_18_49 - - Name: 50-64 - Signal: covid_ag_smoothed_pct_positive_age_50_64 - - Name: 65+ - Signal: covid_ag_smoothed_pct_positive_age_65plus - - --- Name: COVID Cases Id: jhu-csse From 425268b778ea0aa46c8830117c7186c49ecc60a5 Mon Sep 17 00:00:00 2001 From: melange396 Date: Thu, 17 Aug 2023 18:07:51 -0400 Subject: [PATCH 8/9] moving quidel signals to non-public access (#1261) with integration tests! --------- Co-authored-by: Dmytro Trotsko --- docs/api/covidcast-signals/quidel-inactive.md | 1 + docs/api/covidcast-signals/quidel.md | 2 + docs/api/covidcast_signals.md | 1 - .../server/test_covidcast_endpoints.py | 67 ++++++++++++++++++- integrations/server/test_covidcast_meta.py | 42 ++++++++++-- src/server/_security.py | 16 +++++ src/server/endpoints/covidcast.py | 37 ++++++++++ src/server/endpoints/covidcast_meta.py | 14 +++- 8 files changed, 171 insertions(+), 9 deletions(-) diff --git a/docs/api/covidcast-signals/quidel-inactive.md b/docs/api/covidcast-signals/quidel-inactive.md index 19b98ee6a..1c7583845 100644 --- a/docs/api/covidcast-signals/quidel-inactive.md +++ b/docs/api/covidcast-signals/quidel-inactive.md @@ -15,6 +15,7 @@ grand_parent: COVIDcast Main Endpoint 1. TOC {:toc} +## Accessibility: Delphi-internal only ## COVID-19 Tests These signals are still active. Documentation is available on the [Quidel page](quidel.md). diff --git a/docs/api/covidcast-signals/quidel.md b/docs/api/covidcast-signals/quidel.md index 54296dbb3..6cb153269 100644 --- a/docs/api/covidcast-signals/quidel.md +++ b/docs/api/covidcast-signals/quidel.md @@ -15,6 +15,8 @@ grand_parent: COVIDcast Main Endpoint 1. TOC {:toc} +## Accessibility: Delphi-internal only + ## COVID-19 Tests * **Earliest issue available:** July 29, 2020 diff --git a/docs/api/covidcast_signals.md b/docs/api/covidcast_signals.md index 895977010..4e69b6d73 100644 --- a/docs/api/covidcast_signals.md +++ b/docs/api/covidcast_signals.md @@ -36,7 +36,6 @@ dashboard](https://delphi.cmu.edu/covidcast/): | Early Indicators | COVID-Like Symptoms | [`fb-survey`](covidcast-signals/fb-survey.md) | `smoothed_wcli` | | Early Indicators | COVID-Like Symptoms in Community | [`fb-survey`](covidcast-signals/fb-survey.md) | `smoothed_whh_cmnty_cli` | | Early Indicators | COVID-Related Doctor Visits | [`doctor-visits`](covidcast-signals/doctor-visits.md) | `smoothed_adj_cli` | -| Cases and Testing | COVID Antigen Test Positivity (Quidel) | [`quidel`](covidcast-signals/quidel.md) | `covid_ag_smoothed_pct_positive` | | Cases and Testing | COVID Cases | [`jhu-csse`](covidcast-signals/jhu-csse.md) | `confirmed_7dav_incidence_prop` | | Late Indicators | COVID Hospital Admissions | [`hhs`](covidcast-signals/hhs.md) | `confirmed_admissions_covid_1d_prop_7dav` | | Late Indicators | Deaths | [`jhu-csse`](covidcast-signals/jhu-csse.md) | `deaths_7dav_incidence_prop` | diff --git a/integrations/server/test_covidcast_endpoints.py b/integrations/server/test_covidcast_endpoints.py index c86bb10d6..3ba0af039 100644 --- a/integrations/server/test_covidcast_endpoints.py +++ b/integrations/server/test_covidcast_endpoints.py @@ -26,7 +26,19 @@ def localSetUp(self): # reset the `covidcast_meta_cache` table (it should always have one row) self._db._cursor.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') - def _fetch(self, endpoint="/", is_compatibility=False, **params): + cur = self._db._cursor + # NOTE: we must specify the db schema "epidata" here because the cursor/connection are bound to schema "covid" + cur.execute("TRUNCATE TABLE epidata.api_user") + cur.execute("TRUNCATE TABLE epidata.user_role") + cur.execute("TRUNCATE TABLE epidata.user_role_link") + cur.execute("INSERT INTO epidata.api_user (api_key, email) VALUES ('quidel_key', 'quidel_email')") + cur.execute("INSERT INTO epidata.user_role (name) VALUES ('quidel')") + cur.execute( + "INSERT INTO epidata.user_role_link (user_id, role_id) SELECT api_user.id, user_role.id FROM epidata.api_user JOIN epidata.user_role WHERE api_key='quidel_key' and user_role.name='quidel'" + ) + cur.execute("INSERT INTO epidata.api_user (api_key, email) VALUES ('key', 'email')") + + def _fetch(self, endpoint="/", is_compatibility=False, auth=AUTH, **params): # make the request if is_compatibility: url = BASE_URL_OLD @@ -37,7 +49,7 @@ def _fetch(self, endpoint="/", is_compatibility=False, **params): params.setdefault("data_source", params.get("source")) else: url = f"{BASE_URL}{endpoint}" - response = requests.get(url, params=params, auth=AUTH) + response = requests.get(url, params=params, auth=auth) response.raise_for_status() return response.json() @@ -67,6 +79,28 @@ def test_basic(self): out = self._fetch("/", signal=first.signal_pair(), geo=first.geo_pair(), time="day:*") self.assertEqual(len(out["epidata"]), len(rows)) + def test_basic_restricted_source(self): + """Request a signal from the / endpoint.""" + rows = [CovidcastTestRow.make_default_row(time_value=2020_04_01 + i, value=i, source="quidel") for i in range(10)] + first = rows[0] + self._insert_rows(rows) + + with self.subTest("validation"): + out = self._fetch("/") + self.assertEqual(out["result"], -1) + + with self.subTest("no_roles"): + out = self._fetch("/", signal=first.signal_pair(), geo=first.geo_pair(), time="day:*") + self.assertEqual(len(out["epidata"]), 0) + + with self.subTest("no_api_key"): + out = self._fetch("/", auth=None, signal=first.signal_pair(), geo=first.geo_pair(), time="day:*") + self.assertEqual(len(out["epidata"]), 0) + + with self.subTest("quidel_role"): + out = self._fetch("/", auth=("epidata", "quidel_key"), signal=first.signal_pair(), geo=first.geo_pair(), time="day:*") + self.assertEqual(len(out["epidata"]), len(rows)) + def test_compatibility(self): """Request at the /api.php endpoint.""" rows = [CovidcastTestRow.make_default_row(source="src", signal="sig", time_value=2020_04_01 + i, value=i) for i in range(10)] @@ -271,6 +305,35 @@ def test_meta(self): out = self._fetch("/meta", signal=f"{first.source}:X") self.assertEqual(len(out), 0) + def test_meta_restricted(self): + """Request 'restricted' signals from the /meta endpoint.""" + # NOTE: this method is nearly identical to ./test_covidcast_meta.py:test_restricted_sources() + # ...except the self._fetch() methods are different, as is the format of those methods' outputs + # (the other covidcast_meta endpoint uses APrinter, this one returns its own unadulterated json). + # additionally, the sample data used here must match entries (that is, named sources and signals) + # from covidcast_utils.model.data_sources (the `data_sources` variable from file + # src/server/endpoints/covidcast_utils/model.py, which is created by the _load_data_sources() method + # and fed by src/server/endpoints/covidcast_utils/db_sources.csv, but also surreptitiously augmened + # by _load_data_signals() which attaches a list of signals to each source, + # in turn fed by src/server/endpoints/covidcast_utils/db_signals.csv) + + # insert data from two different sources, one restricted/protected (quidel), one not + self._insert_rows([ + CovidcastTestRow.make_default_row(source="quidel", signal="raw_pct_negative"), + CovidcastTestRow.make_default_row(source="hhs", signal="confirmed_admissions_covid_1d") + ]) + + # update metadata cache + update_cache(args=None) + + # verify unauthenticated (no api key) or unauthorized (user w/o privilege) only see metadata for one source + self.assertEqual(len(self._fetch("/meta", auth=None)), 1) + self.assertEqual(len(self._fetch("/meta", auth=AUTH)), 1) + + # verify authorized user sees metadata for both sources + qauth = ('epidata', 'quidel_key') + self.assertEqual(len(self._fetch("/meta", auth=qauth)), 2) + def test_coverage(self): """Request a signal from the /coverage endpoint.""" diff --git a/integrations/server/test_covidcast_meta.py b/integrations/server/test_covidcast_meta.py index ad297f1e8..d03317c98 100644 --- a/integrations/server/test_covidcast_meta.py +++ b/integrations/server/test_covidcast_meta.py @@ -9,6 +9,7 @@ #first party from delphi_utils import Nans +from delphi.epidata.acquisition.covidcast.test_utils import CovidcastBase, CovidcastTestRow from delphi.epidata.maintenance.covidcast_meta_cache_updater import main as update_cache import delphi.operations.secrets as secrets @@ -17,7 +18,7 @@ AUTH = ('epidata', 'key') -class CovidcastMetaTests(unittest.TestCase): +class CovidcastMetaTests(CovidcastBase): """Tests the `covidcast_meta` endpoint.""" src_sig_lookups = { @@ -48,7 +49,7 @@ class CovidcastMetaTests(unittest.TestCase): %d, %d) ''' - def setUp(self): + def localSetUp(self): """Perform per-test setup.""" # connect to the `epidata` database and clear the `covidcast` table @@ -68,6 +69,17 @@ def setUp(self): # reset the `covidcast_meta_cache` table (it should always have one row) cur.execute('update covidcast_meta_cache set timestamp = 0, epidata = "[]"') + # NOTE: we must specify the db schema "epidata" here because the cursor/connection are bound to schema "covid" + cur.execute("TRUNCATE TABLE epidata.api_user") + cur.execute("TRUNCATE TABLE epidata.user_role") + cur.execute("TRUNCATE TABLE epidata.user_role_link") + cur.execute("INSERT INTO epidata.api_user (api_key, email) VALUES ('quidel_key', 'quidel_email')") + cur.execute("INSERT INTO epidata.user_role (name) VALUES ('quidel')") + cur.execute( + "INSERT INTO epidata.user_role_link (user_id, role_id) SELECT api_user.id, user_role.id FROM epidata.api_user JOIN epidata.user_role WHERE api_key='quidel_key' and user_role.name='quidel'" + ) + cur.execute("INSERT INTO epidata.api_user (api_key, email) VALUES ('key', 'email')") + # populate dimension tables for (src,sig) in self.src_sig_lookups: cur.execute(''' @@ -93,7 +105,7 @@ def setUp(self): secrets.db.epi = ('user', 'pass') - def tearDown(self): + def localTearDown(self): """Perform per-test teardown.""" self.cur.close() self.cnx.close() @@ -138,10 +150,10 @@ def _get_id(self): return self.id_counter @staticmethod - def _fetch(**kwargs): + def _fetch(auth=AUTH, **kwargs): params = kwargs.copy() params['endpoint'] = 'covidcast_meta' - response = requests.get(BASE_URL, params=params, auth=AUTH) + response = requests.get(BASE_URL, params=params, auth=auth) response.raise_for_status() return response.json() @@ -161,6 +173,26 @@ def test_round_trip(self): 'message': 'success', }) + def test_restricted_sources(self): + # NOTE: this method is nearly identical to ./test_covidcast_endpoints.py:test_meta_restricted() + + # insert data from two different sources, one restricted/protected (quidel), one not + self._insert_rows([ + CovidcastTestRow.make_default_row(source="quidel"), + CovidcastTestRow.make_default_row(source="not-quidel") + ]) + + # generate metadata cache + update_cache(args=None) + + # verify unauthenticated (no api key) or unauthorized (user w/o privilege) only see metadata for one source + self.assertEqual(len(self._fetch(auth=None)['epidata']), 1) + self.assertEqual(len(self._fetch(auth=AUTH)['epidata']), 1) + + # verify authorized user sees metadata for both sources + qauth = ('epidata', 'quidel_key') + self.assertEqual(len(self._fetch(auth=qauth)['epidata']), 2) + def test_filter(self): """Test filtering options some sample data.""" diff --git a/src/server/_security.py b/src/server/_security.py index 38294eb10..c47f948a5 100644 --- a/src/server/_security.py +++ b/src/server/_security.py @@ -82,6 +82,22 @@ def decorated_function(*args, **kwargs): return decorator_wrapper +# key is data "source" name, value is role name required to access that source +sources_protected_by_roles = { + 'quidel': 'quidel', + # the following two entries are needed because + # the covidcast endpoint uses this method + # to allow using various different "source" name aliases: + # delphi.epidata.server.endpoints.covidcast_utils.model.create_source_signal_alias_mapper() + # which, for reference, is populated by the file: + # src/server/endpoints/covidcast_utils/db_sources.csv + 'quidel-covid-ag': 'quidel', + 'quidel-flu': 'quidel', +} +# TODO(): source this info from a better place than a hardcoded dict: +# maybe somewhere in the db? maybe in src/server/endpoints/covidcast_utils/db_sources.csv ? + + def update_key_last_time_used(user): if user: # update last usage for this user's api key to "now()" diff --git a/src/server/endpoints/covidcast.py b/src/server/endpoints/covidcast.py index c1350b490..bd336dacf 100644 --- a/src/server/endpoints/covidcast.py +++ b/src/server/endpoints/covidcast.py @@ -30,11 +30,13 @@ ) from .._query import QueryBuilder, execute_query, run_query, parse_row, filter_fields from .._printer import create_printer, CSVPrinter +from .._security import current_user, sources_protected_by_roles from .._validate import require_all from .._pandas import as_pandas, print_pandas from .covidcast_utils import compute_trend, compute_trends, compute_trend_value, CovidcastMetaEntry from ..utils import shift_day_value, day_to_time_value, time_value_to_iso, time_value_to_day, shift_week_value, time_value_to_week, guess_time_value_is_day, week_to_time_value, TimeValues from .covidcast_utils.model import TimeType, count_signal_time_types, data_sources, create_source_signal_alias_mapper +from delphi.epidata.common.logger import get_structured_logger # first argument is the endpoint name bp = Blueprint("covidcast", __name__) @@ -43,9 +45,30 @@ latest_table = "epimetric_latest_v" history_table = "epimetric_full_v" +def restrict_by_roles(source_signal_sets): + # takes a list of SourceSignalSet objects + # and returns only those from the list + # that the current user is permitted to access. + user = current_user + allowed_source_signal_sets = [] + for src_sig_set in source_signal_sets: + src = src_sig_set.source + if src in sources_protected_by_roles: + role = sources_protected_by_roles[src] + if user and user.has_role(role): + allowed_source_signal_sets.append(src_sig_set) + else: + # protected src and user does not have permission => leave it out of the srcsig sets + get_structured_logger("covcast_endpt").warning("non-authZd request for restricted 'source'", api_key=(user and user.api_key), src=src) + else: + allowed_source_signal_sets.append(src_sig_set) + return allowed_source_signal_sets + + @bp.route("/", methods=("GET", "POST")) def handle(): source_signal_sets = parse_source_signal_sets() + source_signal_sets = restrict_by_roles(source_signal_sets) source_signal_sets, alias_mapper = create_source_signal_alias_mapper(source_signal_sets) time_set = parse_time_set() geo_sets = parse_geo_sets() @@ -102,6 +125,7 @@ def _verify_argument_time_type_matches(is_day_argument: bool, count_daily_signal def handle_trend(): require_all(request, "window", "date") source_signal_sets = parse_source_signal_sets() + source_signal_sets = restrict_by_roles(source_signal_sets) daily_signals, weekly_signals = count_signal_time_types(source_signal_sets) source_signal_sets, alias_mapper = create_source_signal_alias_mapper(source_signal_sets) geo_sets = parse_geo_sets() @@ -157,6 +181,7 @@ def gen(rows): def handle_trendseries(): require_all(request, "window") source_signal_sets = parse_source_signal_sets() + source_signal_sets = restrict_by_roles(source_signal_sets) daily_signals, weekly_signals = count_signal_time_types(source_signal_sets) source_signal_sets, alias_mapper = create_source_signal_alias_mapper(source_signal_sets) geo_sets = parse_geo_sets() @@ -405,8 +430,19 @@ def handle_meta(): entry = by_signal.setdefault((row["data_source"], row["signal"]), []) entry.append(row) + user = current_user sources: List[Dict[str, Any]] = [] for source in data_sources: + src = source.db_source + if src in sources_protected_by_roles: + role = sources_protected_by_roles[src] + if not (user and user.has_role(role)): + # if this is a protected source + # and the user doesnt have the allowed role + # (or if we have no user) + # then skip this source + continue + meta_signals: List[Dict[str, Any]] = [] for signal in source.signals: @@ -448,6 +484,7 @@ def handle_coverage(): """ source_signal_sets = parse_source_signal_sets() + source_signal_sets = restrict_by_roles(source_signal_sets) daily_signals, weekly_signals = count_signal_time_types(source_signal_sets) source_signal_sets, alias_mapper = create_source_signal_alias_mapper(source_signal_sets) diff --git a/src/server/endpoints/covidcast_meta.py b/src/server/endpoints/covidcast_meta.py index 52d0a06eb..35dc9f12e 100644 --- a/src/server/endpoints/covidcast_meta.py +++ b/src/server/endpoints/covidcast_meta.py @@ -8,6 +8,7 @@ from .._params import extract_strings from .._printer import create_printer from .._query import filter_fields +from .._security import current_user, sources_protected_by_roles from delphi.epidata.common.logger import get_structured_logger bp = Blueprint("covidcast_meta", __name__) @@ -71,17 +72,28 @@ def handle(): age = metadata["age"] reported_age = max(0, min(age, standard_age) - age_margin) + user = current_user + def cache_entry_gen(): for entry in metadata_list: if time_types and entry.get("time_type") not in time_types: continue if geo_types and entry.get("geo_type") not in geo_types: continue + entry_source = entry.get("data_source") + if entry_source in sources_protected_by_roles: + role = sources_protected_by_roles[entry_source] + if not (user and user.has_role(role)): + # if this is a protected source + # and the user doesnt have the allowed role + # (or if we have no user) + # then skip this source + continue if not signals: yield entry for signal in signals: # match source and (signal or no signal or signal = *) - if entry.get("data_source") == signal.source and ( + if entry_source == signal.source and ( signal.signal == "*" or signal.signal == entry.get("signal") ): yield entry From 72778e9a4e83649429ad63749c0a8f91850de840 Mon Sep 17 00:00:00 2001 From: melange396 Date: Thu, 17 Aug 2023 22:18:01 +0000 Subject: [PATCH 9/9] chore: release delphi-epidata 4.1.8 --- .bumpversion.cfg | 2 +- dev/local/setup.cfg | 2 +- src/client/delphi_epidata.R | 2 +- src/client/delphi_epidata.js | 2 +- src/client/packaging/npm/package.json | 2 +- src/client/packaging/pypi/delphi_epidata/__init__.py | 2 +- src/client/packaging/pypi/setup.py | 2 +- src/server/_config.py | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index b3cdf4c17..d19fc0628 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 4.1.7 +current_version = 4.1.8 commit = False tag = False diff --git a/dev/local/setup.cfg b/dev/local/setup.cfg index c36750505..4ccf7a7de 100644 --- a/dev/local/setup.cfg +++ b/dev/local/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = Delphi Development -version = 4.1.7 +version = 4.1.8 [options] packages = diff --git a/src/client/delphi_epidata.R b/src/client/delphi_epidata.R index cdb2da385..b6a749c4c 100644 --- a/src/client/delphi_epidata.R +++ b/src/client/delphi_epidata.R @@ -15,7 +15,7 @@ Epidata <- (function() { # API base url BASE_URL <- getOption('epidata.url', default = 'https://api.delphi.cmu.edu/epidata/') - client_version <- '4.1.7' + client_version <- '4.1.8' auth <- getOption("epidata.auth", default = NA) diff --git a/src/client/delphi_epidata.js b/src/client/delphi_epidata.js index 6c658f86f..2e297fbc7 100644 --- a/src/client/delphi_epidata.js +++ b/src/client/delphi_epidata.js @@ -22,7 +22,7 @@ } })(this, function (exports, fetchImpl, jQuery) { const BASE_URL = "https://api.delphi.cmu.edu/epidata/"; - const client_version = "4.1.7"; + const client_version = "4.1.8"; // Helper function to cast values and/or ranges to strings function _listitem(value) { diff --git a/src/client/packaging/npm/package.json b/src/client/packaging/npm/package.json index f80f39750..d472d4679 100644 --- a/src/client/packaging/npm/package.json +++ b/src/client/packaging/npm/package.json @@ -2,7 +2,7 @@ "name": "delphi_epidata", "description": "Delphi Epidata API Client", "authors": "Delphi Group", - "version": "4.1.7", + "version": "4.1.8", "license": "MIT", "homepage": "https://github.com/cmu-delphi/delphi-epidata", "bugs": { diff --git a/src/client/packaging/pypi/delphi_epidata/__init__.py b/src/client/packaging/pypi/delphi_epidata/__init__.py index 80a1c0af8..2118464c1 100644 --- a/src/client/packaging/pypi/delphi_epidata/__init__.py +++ b/src/client/packaging/pypi/delphi_epidata/__init__.py @@ -1,4 +1,4 @@ from .delphi_epidata import Epidata name = "delphi_epidata" -__version__ = "4.1.7" +__version__ = "4.1.8" diff --git a/src/client/packaging/pypi/setup.py b/src/client/packaging/pypi/setup.py index c29dc1537..08849cfcd 100644 --- a/src/client/packaging/pypi/setup.py +++ b/src/client/packaging/pypi/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="delphi_epidata", - version="4.1.7", + version="4.1.8", author="David Farrow", author_email="dfarrow0@gmail.com", description="A programmatic interface to Delphi's Epidata API.", diff --git a/src/server/_config.py b/src/server/_config.py index 841e93b9e..6acbc7751 100644 --- a/src/server/_config.py +++ b/src/server/_config.py @@ -7,7 +7,7 @@ load_dotenv() -VERSION = "4.1.7" +VERSION = "4.1.8" MAX_RESULTS = int(10e6) MAX_COMPATIBILITY_RESULTS = int(3650)