Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wastewater core #1513

Draft
wants to merge 5 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions docs/epidata_development.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,14 @@ You can test your changes manually by:

What follows is a worked demonstration based on the `fluview` endpoint. Before
starting, make sure that you have the `delphi_database_epidata`,
`delphi_web_epidata`, and `delphi_redis` containers running (with `docker ps`);
if you don't, see the Makefile instructions above.

`delphi_web_epidata`, and `delphi_redis` containers running: to start them, from the `driver` directory, run
1. `make db` for `delphi_database_epidata`
2. `make web` for `dephi_web_epidata`
3. `make redis` to run `delphi_redis`.
You can check that they are running via `docker ps`.
First, let's insert some fake data into the `fluview` table:


```bash
# If you have the mysql client installed locally:
echo 'insert into fluview values \
Expand All @@ -118,6 +121,20 @@ For the inserts above, absence of command-line output is a sign of success. On
the other hand, output after the insertion likely indicates failure (like, for
example, attempting to insert a duplicate unique key).

Or if you would prefer to interact with the server via python (for example to
prototype functions and database commands)

```python
from sqlalchemy import create_engine

engine = create_engine(
"mysql+pymysql://user:pass@127.0.0.1:13306/epidata",
echo=True,
)
engine.connect()
```


Next, you can query the API directly (and parse with Python's JSON tool):

```bash
Expand Down
241 changes: 178 additions & 63 deletions docs/new_endpoint_tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ nav_order: 5
In this tutorial we'll create a brand new endpoint for the Epidata API:
`fluview_meta`. At a high level, we'll do the following steps:

1. understand the data that we want to surface
2. add the new endpoint to the API server
3. add the new endpoint to the various client libraries
4. write an integration test for the new endpoint
5. update API documentation for the new endpoint
6. run all unit and integration tests
0. understand the data that we want to surface
1. add the new endpoint to the API server
- get (transformed) data into a database
- add internal server maintenance code
- add frontend code
2. write an integration test for the new endpoint
3. update API documentation for the new endpoint
4. run all unit and integration tests
5. add the new endpoint to the various client libraries

# setup

Expand Down Expand Up @@ -44,7 +47,108 @@ tree -L 3 .
   ├── py3tester
   └── undef-analysis
```

# Adding data to a database
Before we could possibly serve data in an API, we need to retrieve it, clean it,
and store it locally. This is known as
[ETL](https://en.wikipedia.org/wiki/Extract,_transform,_load), and for any of
the endpoints the code to do this lives in the [acquisition
folder](https://github.com/cmu-delphi/delphi-epidata/tree/dev/src/acquisition).
Retrieving is the least structured of these and depends heavily on the source of
the data. Transforming can be anything from simply cleaning to fit the format of
our database/api, aggregating to higher geographic or temporal levels, to
correcting for knowable anomalies in the data.
## SQL table design
The first step is determining the format of the tables, which is written in a
[ddl](https://stackoverflow.com/questions/2578194/what-are-ddl-and-dml) and
stored [here](https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/ddl/),
for example
[epimetrics](https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/ddl/v4_schema.sql)).
Consider prototyping with something like
[dbdiagram](https://dbdiagram.io/d/wastewater_db-6691809a9939893daecc5d57).
Ideally, any redundant values or metadata should be stored in separate related
tables so as to [normalize the
tables](https://en.wikipedia.org/wiki/Database_normalization) and improve
performance. A rule of thumb for these is if you can think of it as a
categorical, it should have a table to record the possible categories.

In addition to the primary table and it's relational tables, it can be useful to
include a loading table to ease addition of new data to the database, a latest
table to speed up access for getting only the latest data, and several views for
TODO reasons.

Another design consideration is the addition of indexes based on likely queries.
### Data format
In many endpoints, dates are represented using integers as `yyyymmdd` for actual
dates and `yyyyww` for epiweeks.
### Versioning
If there's a possibility you are inserting versions older than latest, it is best practice to include a boolean column in the load table indicating. This column will also be useful for generating a view of the full table
## ETL
After you know the target format, you should start writing methods to perform
each step of the ETL process. Eventually, they should be called within a
`__main__` function in src/acquisition/<endpoint\_name> (ideally
<endpoint\_name>\_main.py). You should partition your code into separate files
for each step in ETL, especially if the transform steps are more than
simply data cleaning.

## Extract
There is not terribly much to be said for extraction; depending on how you get
your data see (TODO list of endpoints based on how they extract data) for an
example, but there is no guarantee that these actually have achieved the optimal
method for that particular method of acquiring data.

One less obvious aspect of the extraction step is validation. Make sure to add
validation checks to the extraction module, with any violations getting recorded
to a logger object.

Another less obvious extraction step is to make sure to record approximately raw
data, stored in a compressed format. This makes recovery from validation or
other errors much easier.
## Transform
If you will be doing significant transformations, consider writing an external
package for that.

One of the more common transformation steps is the removal of redundant
versions. Epimetrics handles this by first exporting the transformed data to
CSVs (for every source, as handled in covidcast-indicators), then comparing with
the previously saved copy of the CSV for differences and only keeping the newer
values. Wastewater handles this entirely within sql by comparing the latest
table with the load table.
<!-- TODO ongoing update -->
## Load
In general, we use [Core
sqlalchemy](https://docs.sqlalchemy.org/en/20/tutorial/index.html) to manage
database connections. The process for loading should roughly be
### Move the data into the load table
The current
recommendation is to use [pandas'
`to_sql`](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html)
with the method set to `multi` for the initial insertion into the `load` table as an initial method, for ease of writing.
If this proves too slow, [see
epimetrics](https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/acquisition/covidcast/database.py)
for an alternative using approximately raw sql, or write a [custom insert method](https://pandas.pydata.org/docs/user_guide/io.html#io-sql-method) that e.g. uses temporary csv's.
### Move categorical data
After inserting into the load table, any new values for the related tables, such as signal or geo\_type, need to be included.
### Insert load data into the full and latest tables
Fairly straightforward. Note that the keys for the related tables need to be added either during or before inserting into either table.

Note that wastewater removes duplicated values with different versions just after adding the key values from the related tables.
### Remove the inserted data from the load table
Since the id of the load table is used to set the id of the full and latest tables, it is important not to drop or truncate when deleting these rows, since this would reset the index.
## Logging
Throughout our repos we use [structlog](https://www.structlog.org/en/stable/)
for logging, with some standard configuration constructed by
[`get_structured_logger`](https://github.com/cmu-delphi/covidcast-indicators/blob/1451e0742169e6e8909da4059cb37a17d397145f/_delphi_utils_python/delphi_utils/logger.py#L48).

## Unit Testing
We need to test that the functions we're adding cover the expected kinds of data, and error appropriately when the data is invalid. To do this, we'll add some unit tests.

To test the logs generated by this system in unit tests, create a `logger`
object as normal with `get_structured_logger`, and then wrap any lines whose
logs you want to test in `with capture_logs() as cap_logs:`, as done in the
[first example here](https://www.structlog.org/en/latest/testing.html), or in
[the wastewater
endpoint](https://github.com/cmu-delphi/delphi-epidata/tree/511607f1560af0a578ce9fc8b580f4a18ef48916/tests/acquisition/wastewater/test_nwss_formatting.py).
<!-- TODO this link is dead for now until this branch is merged. -->
# the data

Here's the requirement: we need to quickly surface the most recent "issue"
Expand All @@ -62,15 +166,79 @@ the following:
- latest "issue", which is the publication epiweek
- total size of the table

# update the server
## Acquire data
If, unlike `fluview` you need to acquire add new data in addition to a new endpoint, you will need to add an appropriate data ingestion method.

Since we're using the `fluview` table, we're piggybacking off of [src/acquisition/fluview](https://github.com/cmu-delphi/delphi-epidata/tree/dev/src/acquisition/fluview).
To run ingestion, cronicle runs [fluview_update.py](https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/acquisition/fluview/fluview_update.py), while the other scripts provide methods for that.
### Secrets
If you are pulling from an API or other source which needs authentication, you will need to add your secret into the backend. How to go about this for new endpoints is TODO.
## Tests
It is recommended to use a dummy database as a part of unit testing; for an example see TODO
## Adding new packages
If for whatever reason you need to add a new dependency TODO
# update the server API

1. create a new file in `/src/server/endpoints/` e.g., `fluview_meta.py`, or copy an existing one.
1. create a new file in `/src/server/endpoints/`, e.g. `fluview_meta.py`, or copy an existing one.
2. edit the created file `Blueprint("fluview_meta", __name__)` such that the first argument matches the target endpoint name
3. edit the existing `/src/server/endpoints/__init__.py` to add the newly-created file to the imports (top) and to the list of endpoints (below).


# update the client libraries
# add an integration test

Now that we've changed several files, we need to make sure that the changes
work as intended _before_ submitting code for review or committing code to the
repository. Given that the code spans multiple components and languages, this
needs to be an integration test. See more about integration testing in Delphi's
[frontend development guide](https://github.com/cmu-delphi/operations/blob/main/docs/frontend_development.md#integration).

Create an integration test for the new endpoint by creating a new file,
`integrations/server/test_fluview_meta.py`. There's a good amount of
boilerplate, but fortunately, this can be copied _almost_ verbatim from the
[`fluview` endpoint integration test](https://github.com/cmu-delphi/delphi-epidata/blob/main/integrations/server/test_fluview.py).

Include the following pieces:

- top-level docstring (update name to `fluview_meta`)
- the imports section (no changes needed)
- the test class (update name and docstring for `fluview_meta`)
- the methods `setUpClass`, `setUp`, and `tearDown` (no changes needed)

Add the following test method, which creates some dummy data, fetches the new
`fluview_meta` endpoint using the Python client library, and asserts that the
returned value is what we expect.

```python
def test_round_trip(self):
"""Make a simple round-trip with some sample data."""

# insert dummy data
self.cur.execute('''
insert into fluview values
(0, "2020-04-07", 202021, 202020, "nat", 1, 2, 3, 4, 3.14159, 1.41421,
10, 11, 12, 13, 14, 15),
(0, "2020-04-28", 202022, 202022, "hhs1", 5, 6, 7, 8, 1.11111, 2.22222,
20, 21, 22, 23, 24, 25)
''')
self.cnx.commit()

# make the request
response = Epidata.fluview_meta()

# assert that the right data came back
self.assertEqual(response, {
'result': 1,
'epidata': [{
'latest_update': '2020-04-28',
'latest_issue': 202022,
'table_rows': 2,
}],
'message': 'success',
})
```

# update the client libraries
<!-- TODO this section is very much out of date-->
There are currently four client libraries. They all need to be updated to make
the new `fluview_meta` endpoint available to callers. The pattern is very
similar for all endpoints so that copy-paste will get you 90% of the way there.
Expand Down Expand Up @@ -151,59 +319,6 @@ Here's what we add to each client:
fluview_meta = fluview_meta,
```

# add an integration test

Now that we've changed several files, we need to make sure that the changes
work as intended _before_ submitting code for review or committing code to the
repository. Given that the code spans multiple components and languages, this
needs to be an integration test. See more about integration testing in Delphi's
[frontend development guide](https://github.com/cmu-delphi/operations/blob/main/docs/frontend_development.md#integration).

Create an integration test for the new endpoint by creating a new file,
`integrations/server/test_fluview_meta.py`. There's a good amount of
boilerplate, but fortunately, this can be copied _almost_ verbatim from the
[`fluview` endpoint integration test](https://github.com/cmu-delphi/delphi-epidata/blob/main/integrations/server/test_fluview.py).

Include the following pieces:

- top-level docstring (update name to `fluview_meta`)
- the imports section (no changes needed)
- the test class (update name and docstring for `fluview_meta`)
- the methods `setUpClass`, `setUp`, and `tearDown` (no changes needed)

Add the following test method, which creates some dummy data, fetches the new
`fluview_meta` endpoint using the Python client library, and asserts that the
returned value is what we expect.

```python
def test_round_trip(self):
"""Make a simple round-trip with some sample data."""

# insert dummy data
self.cur.execute('''
insert into fluview values
(0, "2020-04-07", 202021, 202020, "nat", 1, 2, 3, 4, 3.14159, 1.41421,
10, 11, 12, 13, 14, 15),
(0, "2020-04-28", 202022, 202022, "hhs1", 5, 6, 7, 8, 1.11111, 2.22222,
20, 21, 22, 23, 24, 25)
''')
self.cnx.commit()

# make the request
response = Epidata.fluview_meta()

# assert that the right data came back
self.assertEqual(response, {
'result': 1,
'epidata': [{
'latest_update': '2020-04-28',
'latest_issue': 202022,
'table_rows': 2,
}],
'message': 'success',
})
```

# write documentation

This consists of two steps: add a new document for the `fluview_meta` endpoint,
Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ include = 'server,tests/server'

[tool.pylint.'DESIGN']
ignored-argument-names = ['(_.*|run_as_module)']
[tool.pytest.ini_options]
pythonpath = [
".", "src"
]
3 changes: 3 additions & 0 deletions src/acquisition/wastewater/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from os import path

WASTEWATER_DDL = path.join(path.dirname(__file__), "../../ddl/wastewater.sql")
Loading
Loading