Skip to content

Commit

Permalink
Merge popularity calculations and data refresh into a single DAG (#496)
Browse files Browse the repository at this point in the history
* First pass at combining popularity calc, matview refresh, and data refresh DAGs

* Remove the old popularity refresh DAGs

* Use the data_refresh timeout from the configuration in the DAG

* Only force a single pool slot for the wait_for_data_refresh task

* Refactor TaskGroups out into factories and update docs

* Update docs, fix conditional

* Add config option to force metrics refresh when not the first of the month

* Fix log message

* Remove deleted DAG tests

* Use timeout from data refresh config

* Document the force_refresh_metrics option

* Add link to this issue in the docs for reference

* Safely get options from config, fix timeout

* Consider dagrun first of the month if previous runs failed

* Don't refresh metrics is option is explicitly configured to False

If `force_refresh_metrics` is explictly configured to False (rather
than omitted), then do not refresh the popularity metrics even if
this is the first successful run of the month.

This option could be helpful if, for example, the first dagrun of the
month succeeds during the popularity steps but fails during the data
refresh. When we manually re-run the DAG, we can save time by skipping
this step.

* Use current dagrun start_date instead of datetime.now()

* Test the month_check operator

* Fix dates in tests, only clean up DagRuns associated to the test Dag

* Handle case where there isn't a successful previous run

* Make task names more explicit

* Clean up unused param

* Remove unused param in tests as well

* Fix type, pull out constant

* Clean up queries and operators

* Add docs to tasks

* Add type for media_type

* Inline docs and CamelCase type

* Remove MediaType type for now

* Clarify flow of data through the data refresh in comments

* Fix type string

* Update timeouts for popularity refresh tasks
  • Loading branch information
stacimc authored May 13, 2022
1 parent 2045881 commit 3ef7138
Show file tree
Hide file tree
Showing 13 changed files with 561 additions and 416 deletions.
3 changes: 3 additions & 0 deletions openverse_catalog/dags/common/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from datetime import datetime, timedelta

from common import slack
Expand All @@ -18,3 +19,5 @@
"on_failure_callback": slack.on_failure_callback,
}
XCOM_PULL_TEMPLATE = "{{{{ ti.xcom_pull(task_ids='{}', key='{}') }}}}"

POSTGRES_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing")
45 changes: 18 additions & 27 deletions openverse_catalog/dags/common/popularity/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,22 @@
logger = logging.getLogger(__name__)


DROP_MEDIA_POPULARITY_RELATIONS_TASK_ID = "drop_media_popularity_relations"
DROP_MEDIA_POPULARITY_FUNCTIONS_TASK_ID = "drop_media_popularity_functions"
CREATE_MEDIA_POPULARITY_METRICS_TASK_ID = "create_media_popularity_metrics_table"
UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID = "update_media_popularity_metrics_table"
CREATE_MEDIA_POPULARITY_PERCENTILE_TASK_ID = "create_media_popularity_percentile"
CREATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID = "create_media_popularity_constants_view"
CREATE_MEDIA_STANDARDIZED_POPULARITY_TASK_ID = "create_media_standardized_popularity"
CREATE_DB_VIEW_TASK_ID = "create_materialized_popularity_view"


def drop_media_popularity_relations(
postgres_conn_id,
media_type="image",
):
return PythonOperator(
task_id="drop_media_popularity_relations",
task_id=DROP_MEDIA_POPULARITY_RELATIONS_TASK_ID,
python_callable=sql.drop_media_popularity_relations,
op_args=[postgres_conn_id, media_type],
)
Expand All @@ -23,7 +33,7 @@ def drop_media_popularity_functions(
media_type="image",
):
return PythonOperator(
task_id=f"drop_{media_type}_popularity_functions",
task_id=DROP_MEDIA_POPULARITY_FUNCTIONS_TASK_ID,
python_callable=sql.drop_media_popularity_functions,
op_args=[postgres_conn_id, media_type],
)
Expand All @@ -34,7 +44,7 @@ def create_media_popularity_metrics(
media_type="image",
):
return PythonOperator(
task_id=f"create_{media_type}_popularity_metrics_table",
task_id=CREATE_MEDIA_POPULARITY_METRICS_TASK_ID,
python_callable=sql.create_media_popularity_metrics,
op_args=[postgres_conn_id, media_type],
)
Expand All @@ -45,7 +55,7 @@ def update_media_popularity_metrics(
media_type="image",
):
return PythonOperator(
task_id=f"update_{media_type}_popularity_metrics_table",
task_id=UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID,
python_callable=sql.update_media_popularity_metrics,
op_args=[postgres_conn_id, media_type],
)
Expand All @@ -56,7 +66,7 @@ def create_media_popularity_percentile(
media_type="image",
):
return PythonOperator(
task_id=f"create_{media_type}_popularity_percentile",
task_id=CREATE_MEDIA_POPULARITY_PERCENTILE_TASK_ID,
python_callable=sql.create_media_popularity_percentile_function,
op_args=[postgres_conn_id, media_type],
)
Expand All @@ -67,45 +77,26 @@ def create_media_popularity_constants(
media_type="image",
):
return PythonOperator(
task_id=f"create_{media_type}_popularity_constants_view",
task_id=CREATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID,
python_callable=sql.create_media_popularity_constants_view,
op_args=[postgres_conn_id, media_type],
)


def update_media_popularity_constants(
postgres_conn_id,
media_type="image",
):
return PythonOperator(
task_id=f"update_{media_type}_popularity_constants_view",
python_callable=sql.update_media_popularity_constants,
op_args=[postgres_conn_id, media_type],
)


def create_media_standardized_popularity(
postgres_conn_id,
media_type="image",
):
return PythonOperator(
task_id=f"create_{media_type}_standardized_popularity",
task_id=CREATE_MEDIA_STANDARDIZED_POPULARITY_TASK_ID,
python_callable=sql.create_standardized_media_popularity_function,
op_args=[postgres_conn_id, media_type],
)


def create_db_view(postgres_conn_id, media_type="image"):
return PythonOperator(
task_id=f"create_{media_type}_view",
task_id=CREATE_DB_VIEW_TASK_ID,
python_callable=sql.create_media_view,
op_args=[postgres_conn_id, media_type],
)


def update_db_view(postgres_conn_id, media_type="image"):
return PythonOperator(
task_id=f"update_{media_type}_view",
python_callable=sql.update_db_view,
op_args=[postgres_conn_id, media_type],
)
Loading

0 comments on commit 3ef7138

Please sign in to comment.