Skip to content

Commit

Permalink
Update dag docs
Browse files Browse the repository at this point in the history
  • Loading branch information
stacimc committed Mar 12, 2024
1 parent 836b74c commit 608939e
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 50 deletions.
4 changes: 2 additions & 2 deletions catalog/dags/common/sensors/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@


# DagTags used to establish a concurrency group for each environment
PRODUCTION_ES_CONCURRENCY_TAG = "production_concurrency"
STAGING_ES_CONCURRENCY_TAG = "staging_concurrency"
PRODUCTION_ES_CONCURRENCY_TAG = "production_es_concurrency"
STAGING_ES_CONCURRENCY_TAG = "staging_es_concurrency"

ES_CONCURRENCY_TAGS = {
PRODUCTION: PRODUCTION_ES_CONCURRENCY_TAG,
Expand Down
5 changes: 3 additions & 2 deletions catalog/dags/data_refresh/create_filtered_index_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@
There are two mechanisms that prevent this from happening:
1. The filtered index creation DAGs are not allowed to run if a data refresh
for the media type is already running.
1. The filtered index creation DAGs fail immediately if any of the DAGs that are
tagged as prt of the `production-es-concurrency` group (including the data
refreshes) are currently running.
2. The data refresh DAGs will wait for any pre-existing filtered index creation
DAG runs for the media type to finish before continuing.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
# Update the staging database
# Staging Database Restore DAG
This DAG is responsible for updating the staging database using the most recent
snapshot of the production database.
Expand All @@ -19,6 +19,11 @@
(e.g. `aws_rds`)
- `AIRFLOW_CONN_<ID>`: The connection string to use for RDS operations (per the above
example, it might be `AIRFLOW_CONN_AWS_RDS`)
## Race conditions
Because this DAG completely replaces the staging database, it first waits on any
running DAGs that are tagged as part of the `staging_es_concurrency` group.
"""

import logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@
}
}
```
## Race conditions
Each DAG will fail immediately if any of the DAGs tagged as part of the
es-concurrency group for the DAG's environment is running. (E.g., the
`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with
`staging-es-concurrency` are running.)
"""

import logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,9 @@
## Race conditions
Because this DAG runs on the staging elasticsearch cluster, it does _not_ interfere
with the `data_refresh` or `create_filtered_index` DAGs.
However, as the DAG operates on the staging API database it will exit
immediately if any of the following DAGs are running:
* `staging_database_restore`
* `recreate_full_staging_index`
* `create_new_staging_es_index`
with the production `data_refresh` or `create_filtered_index` DAGs. However, it will
fail immediately if any of the DAGs tagged as part of the `staging-es-concurrency`
group are running.
"""

from datetime import datetime, timedelta
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
## When this DAG runs
This DAG is on a `None` schedule and is run manually.
## Race conditions
Each DAG will fail immediately if any of the DAGs tagged as part of the
es-concurrency group for the DAG's environment is running. (E.g., the
`point_staging_alias` DAG fails immediately if any DAGs tagged with
`staging-es-concurrency` are running.)
"""

from datetime import datetime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@
cluster, it does _not_ interfere with the `data_refresh` or
`create_filtered_index` DAGs.
However, as the DAG operates on the staging API database it will exit
immediately if any of the following DAGs are running:
* `staging_database_restore`
* `create_proportional_by_provider_staging_index`
* `create_new_staging_es_index`
However, as the DAG operates on the staging API database and ES cluster it will exit
immediately if any of the DAGs tagged as part of the `staging_es_concurrency` group
are already running.
"""

from datetime import datetime
Expand Down
96 changes: 63 additions & 33 deletions documentation/catalog/reference/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,25 @@ The following are DAGs grouped by their primary tag:

### Database

| DAG ID | Schedule Interval |
| --------------------------------------------------------------------------------------------- | ----------------- |
| [`batched_update`](#batched_update) | `None` |
| [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` |
| [`delete_records`](#delete_records) | `None` |
| [`point_production_es_alias`](#point_production_es_alias) | `None` |
| [`point_staging_es_alias`](#point_staging_es_alias) | `None` |
| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` |
| [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` |
| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` |
| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` |
| [`staging_database_restore`](#staging_database_restore) | `@monthly` |
| DAG ID | Schedule Interval |
| --------------------------------------------------------------------------------- | ----------------- |
| [`batched_update`](#batched_update) | `None` |
| [`delete_records`](#delete_records) | `None` |
| [`recreate_audio_popularity_calculation`](#recreate_audio_popularity_calculation) | `None` |
| [`recreate_full_staging_index`](#recreate_full_staging_index) | `None` |
| [`recreate_image_popularity_calculation`](#recreate_image_popularity_calculation) | `None` |
| [`report_pending_reported_media`](#report_pending_reported_media) | `@weekly` |
| [`staging_database_restore`](#staging_database_restore) | `@monthly` |

### Elasticsearch

| DAG ID | Schedule Interval |
| ----------------------------------------------------------------------------------------------- | ----------------- |
| [`create_new_production_es_index`](#create_new_production_es_index) | `None` |
| [`create_new_staging_es_index`](#create_new_staging_es_index) | `None` |
| [`create_proportional_by_source_staging_index`](#create_proportional_by_source_staging_index) | `None` |
| [`point_production_es_alias`](#point_production_es_alias) | `None` |
| [`point_staging_es_alias`](#point_staging_es_alias) | `None` |
| [`production_elasticsearch_cluster_healthcheck`](#production_elasticsearch_cluster_healthcheck) | `*/15 * * * *` |
| [`staging_elasticsearch_cluster_healthcheck`](#staging_elasticsearch_cluster_healthcheck) | `*/15 * * * *` |

Expand Down Expand Up @@ -441,8 +441,9 @@ source from which to pull documents.

There are two mechanisms that prevent this from happening:

1. The filtered index creation DAGs are not allowed to run if a data refresh for
the media type is already running.
1. The filtered index creation DAGs fail immediately if any of the DAGs that are
tagged as prt of the `production-es-concurrency` group (including the data
refreshes) are currently running.
2. The data refresh DAGs will wait for any pre-existing filtered index creation
DAG runs for the media type to finish before continuing.

Expand Down Expand Up @@ -496,8 +497,9 @@ source from which to pull documents.

There are two mechanisms that prevent this from happening:

1. The filtered index creation DAGs are not allowed to run if a data refresh for
the media type is already running.
1. The filtered index creation DAGs fail immediately if any of the DAGs that are
tagged as prt of the `production-es-concurrency` group (including the data
refreshes) are currently running.
2. The data refresh DAGs will wait for any pre-existing filtered index creation
DAG runs for the media type to finish before continuing.

Expand Down Expand Up @@ -608,6 +610,13 @@ The resulting, merged configuration will be:
}
```

##### Race conditions

Each DAG will fail immediately if any of the DAGs tagged as part of the
es-concurrency group for the DAG's environment is running. (E.g., the
`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with
`staging-es-concurrency` are running.)

### `create_new_staging_es_index`

#### Create New ES Index DAG
Expand Down Expand Up @@ -712,6 +721,13 @@ The resulting, merged configuration will be:
}
```

##### Race conditions

Each DAG will fail immediately if any of the DAGs tagged as part of the
es-concurrency group for the DAG's environment is running. (E.g., the
`create_new_staging_es_index` DAG fails immediately if any DAGs tagged with
`staging-es-concurrency` are running.)

### `create_proportional_by_source_staging_index`

#### Create Proportional By Source Staging Index DAG
Expand Down Expand Up @@ -740,16 +756,10 @@ This DAG is on a `None` schedule and is run manually.

##### Race conditions

Because this DAG runs on the staging ingestion server and staging elasticsearch
cluster, it does _not_ interfere with the `data_refresh` or
`create_filtered_index` DAGs.

However, as the DAG operates on the staging API database it will exit
immediately if any of the following DAGs are running:

- `staging_database_restore`
- `recreate_full_staging_index`
- `create_new_staging_es_index`
Because this DAG runs on the staging elasticsearch cluster, it does _not_
interfere with the production `data_refresh` or `create_filtered_index` DAGs.
However, it will fail immediately if any of the DAGs tagged as part of the
`staging-es-concurrency` group are running.

### `delete_records`

Expand Down Expand Up @@ -1084,7 +1094,18 @@ also delete that index afterward.

This DAG is on a `None` schedule and is run manually.

<<<<<<< HEAD
### `point_staging_es_alias`
=======
##### Race conditions

Each DAG will fail immediately if any of the DAGs tagged as part of the
es-concurrency group for the DAG's environment is running. (E.g., the
`point_staging_alias` DAG fails immediately if any DAGs tagged with
`staging-es-concurrency` are running.)

### `point_staging_alias`
>>>>>>> be549cdee (Update dag docs)
#### Point ES Alias DAG

Expand All @@ -1100,6 +1121,13 @@ also delete that index afterward.

This DAG is on a `None` schedule and is run manually.

##### Race conditions

Each DAG will fail immediately if any of the DAGs tagged as part of the
es-concurrency group for the DAG's environment is running. (E.g., the
`point_staging_alias` DAG fails immediately if any DAGs tagged with
`staging-es-concurrency` are running.)

### `pr_review_reminders`

#### PR Review Reminders
Expand Down Expand Up @@ -1205,12 +1233,9 @@ Because this DAG runs on the staging ingestion server and staging elasticsearch
cluster, it does _not_ interfere with the `data_refresh` or
`create_filtered_index` DAGs.

However, as the DAG operates on the staging API database it will exit
immediately if any of the following DAGs are running:

- `staging_database_restore`
- `create_proportional_by_provider_staging_index`
- `create_new_staging_es_index`
However, as the DAG operates on the staging API database and ES cluster it will
exit immediately if any of the DAGs tagged as part of the
`staging_es_concurrency` group are already running.

### `recreate_image_popularity_calculation`

Expand Down Expand Up @@ -1290,7 +1315,7 @@ Notes: https://www.smk.dk/en/article/smk-api/

### `staging_database_restore`

#### Update the staging database
#### Staging Database Restore DAG

This DAG is responsible for updating the staging database using the most recent
snapshot of the production database.
Expand All @@ -1312,6 +1337,11 @@ the RDS operations run using a different hook:
- `AIRFLOW_CONN_<ID>`: The connection string to use for RDS operations (per the
above example, it might be `AIRFLOW_CONN_AWS_RDS`)

##### Race conditions

Because this DAG completely replaces the staging database, it first waits on any
running DAGs that are tagged as part of the `staging_es_concurrency` group.

### `staging_elasticsearch_cluster_healthcheck`

Monitor staging and production Elasticsearch cluster health endpoint.
Expand Down

0 comments on commit 608939e

Please sign in to comment.