diff --git a/openverse_catalog/dags/common/constants.py b/openverse_catalog/dags/common/constants.py index 92e6a9956ef..db3a5e8ef7d 100644 --- a/openverse_catalog/dags/common/constants.py +++ b/openverse_catalog/dags/common/constants.py @@ -1,3 +1,4 @@ +import os from datetime import datetime, timedelta from common import slack @@ -18,3 +19,5 @@ "on_failure_callback": slack.on_failure_callback, } XCOM_PULL_TEMPLATE = "{{{{ ti.xcom_pull(task_ids='{}', key='{}') }}}}" + +POSTGRES_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing") diff --git a/openverse_catalog/dags/common/popularity/operators.py b/openverse_catalog/dags/common/popularity/operators.py index 4a858eec874..5d4fbe37bf0 100644 --- a/openverse_catalog/dags/common/popularity/operators.py +++ b/openverse_catalog/dags/common/popularity/operators.py @@ -7,12 +7,22 @@ logger = logging.getLogger(__name__) +DROP_MEDIA_POPULARITY_RELATIONS_TASK_ID = "drop_media_popularity_relations" +DROP_MEDIA_POPULARITY_FUNCTIONS_TASK_ID = "drop_media_popularity_functions" +CREATE_MEDIA_POPULARITY_METRICS_TASK_ID = "create_media_popularity_metrics_table" +UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID = "update_media_popularity_metrics_table" +CREATE_MEDIA_POPULARITY_PERCENTILE_TASK_ID = "create_media_popularity_percentile" +CREATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID = "create_media_popularity_constants_view" +CREATE_MEDIA_STANDARDIZED_POPULARITY_TASK_ID = "create_media_standardized_popularity" +CREATE_DB_VIEW_TASK_ID = "create_materialized_popularity_view" + + def drop_media_popularity_relations( postgres_conn_id, media_type="image", ): return PythonOperator( - task_id="drop_media_popularity_relations", + task_id=DROP_MEDIA_POPULARITY_RELATIONS_TASK_ID, python_callable=sql.drop_media_popularity_relations, op_args=[postgres_conn_id, media_type], ) @@ -23,7 +33,7 @@ def drop_media_popularity_functions( media_type="image", ): return PythonOperator( - task_id=f"drop_{media_type}_popularity_functions", + task_id=DROP_MEDIA_POPULARITY_FUNCTIONS_TASK_ID, python_callable=sql.drop_media_popularity_functions, op_args=[postgres_conn_id, media_type], ) @@ -34,7 +44,7 @@ def create_media_popularity_metrics( media_type="image", ): return PythonOperator( - task_id=f"create_{media_type}_popularity_metrics_table", + task_id=CREATE_MEDIA_POPULARITY_METRICS_TASK_ID, python_callable=sql.create_media_popularity_metrics, op_args=[postgres_conn_id, media_type], ) @@ -45,7 +55,7 @@ def update_media_popularity_metrics( media_type="image", ): return PythonOperator( - task_id=f"update_{media_type}_popularity_metrics_table", + task_id=UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID, python_callable=sql.update_media_popularity_metrics, op_args=[postgres_conn_id, media_type], ) @@ -56,7 +66,7 @@ def create_media_popularity_percentile( media_type="image", ): return PythonOperator( - task_id=f"create_{media_type}_popularity_percentile", + task_id=CREATE_MEDIA_POPULARITY_PERCENTILE_TASK_ID, python_callable=sql.create_media_popularity_percentile_function, op_args=[postgres_conn_id, media_type], ) @@ -67,29 +77,18 @@ def create_media_popularity_constants( media_type="image", ): return PythonOperator( - task_id=f"create_{media_type}_popularity_constants_view", + task_id=CREATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID, python_callable=sql.create_media_popularity_constants_view, op_args=[postgres_conn_id, media_type], ) -def update_media_popularity_constants( - postgres_conn_id, - media_type="image", -): - return PythonOperator( - task_id=f"update_{media_type}_popularity_constants_view", - python_callable=sql.update_media_popularity_constants, - op_args=[postgres_conn_id, media_type], - ) - - def create_media_standardized_popularity( postgres_conn_id, media_type="image", ): return PythonOperator( - task_id=f"create_{media_type}_standardized_popularity", + task_id=CREATE_MEDIA_STANDARDIZED_POPULARITY_TASK_ID, python_callable=sql.create_standardized_media_popularity_function, op_args=[postgres_conn_id, media_type], ) @@ -97,15 +96,7 @@ def create_media_standardized_popularity( def create_db_view(postgres_conn_id, media_type="image"): return PythonOperator( - task_id=f"create_{media_type}_view", + task_id=CREATE_DB_VIEW_TASK_ID, python_callable=sql.create_media_view, op_args=[postgres_conn_id, media_type], ) - - -def update_db_view(postgres_conn_id, media_type="image"): - return PythonOperator( - task_id=f"update_{media_type}_view", - python_callable=sql.update_db_view, - op_args=[postgres_conn_id, media_type], - ) diff --git a/openverse_catalog/dags/data_refresh/dag_factory.py b/openverse_catalog/dags/data_refresh/dag_factory.py index 0987231c9cc..b76b1bd954c 100644 --- a/openverse_catalog/dags/data_refresh/dag_factory.py +++ b/openverse_catalog/dags/data_refresh/dag_factory.py @@ -1,135 +1,144 @@ """ # Data Refresh DAG Factory This file generates our data refresh DAGs using a factory function. -These DAGs initiate a data refresh for a given media type and awaits the -success or failure of the refresh. Importantly, they are also configured to -ensure that no two data refresh DAGs can run concurrently, as required by -the server. - -A data refresh occurs on the data refresh server in the openverse-api project. -This is a task which imports data from the upstream Catalog database into the -API, copies contents to a new Elasticsearch index, and makes the index "live". -This process is necessary to make new content added to the Catalog by our -provider DAGs available on the frontend. You can read more in the [README]( +For the given media type these DAGs will first refresh the popularity data, +then initiate a data refresh on the data refresh server and await the +success or failure of that task. + +Popularity data for each media type is collated in a materialized view. Before +initiating a data refresh, the DAG will first refresh the view in order to +update popularity data for records that have been ingested since the last refresh. +On the first run of the the month, the DAG will also refresh the underlying tables, +including the percentile values and any new popularity metrics. The DAG can also +be run with the `force_refresh_metrics` option to run this refresh after the first +of the month. + +Once this step is complete, the data refresh can be initiated. A data refresh +occurs on the data refresh server in the openverse-api project. This is a task +which imports data from the upstream Catalog database into the API, copies contents +to a new Elasticsearch index, and finally makes the index "live". This process is +necessary to make new content added to the Catalog by our provider DAGs available +to the API. You can read more in the [README]( https://github.com/WordPress/openverse-api/blob/main/ingestion_server/README.md -) - -The DAGs generated by this factory allow us to schedule those refreshes through -Airflow. Since no two refreshes can run simultaneously, all tasks are run in a -special `data_refresh` pool with a single worker slot. To ensure that tasks -run in an acceptable order (ie the trigger step for one DAG cannot run if a -previously triggered refresh is still running), each DAG has the following -steps: - -1. The `wait_for_data_refresh` step uses a custom Sensor that will wait until -none of the `external_dag_ids` (corresponding to the other data refresh DAGs) -are 'running'. A DAG is considered to be 'running' if it is itself in the -RUNNING state __and__ its own `wait_for_data_refresh` step has completed -successfully. The Sensor suspends itself and frees up the worker slot if -another data refresh DAG is running. - -2. The `trigger_data_refresh` step then triggers the data refresh by POSTing -to the `/task` endpoint on the data refresh server with relevant data. A -successful response will include the `status_check` url used to check on the -status of the refresh, which is passed on to the next task via XCom. - -3. Finally the `wait_for_data_refresh` task waits for the data refresh to be -complete by polling the `status_url`. Note this task does not need to be -able to suspend itself and free the worker slot, because we want to lock the -entire pool on waiting for a particular data refresh to run. +) Importantly, the data refresh TaskGroup is also configured to handle concurrency +requirements of the data refresh server. You can find more background information on this process in the following issues and related PRs: - [[Feature] Data refresh orchestration DAG]( https://github.com/WordPress/openverse-catalog/issues/353) +- [[Feature] Merge popularity calculations and data refresh into a single DAG]( +https://github.com/WordPress/openverse-catalog/issues/453) """ -import json import logging -import os from typing import Sequence -from urllib.parse import urlparse from airflow import DAG -from airflow.exceptions import AirflowException -from airflow.providers.http.operators.http import SimpleHttpOperator -from airflow.providers.http.sensors.http import HttpSensor -from common.constants import DAG_DEFAULT_ARGS, XCOM_PULL_TEMPLATE -from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor +from airflow.models.dagrun import DagRun +from airflow.operators.python import BranchPythonOperator +from airflow.settings import SASession +from airflow.utils.session import provide_session +from airflow.utils.state import State +from common.constants import DAG_DEFAULT_ARGS +from data_refresh.data_refresh_task_factory import create_data_refresh_task_group from data_refresh.data_refresh_types import DATA_REFRESH_CONFIGS, DataRefresh -from requests import Response +from data_refresh.refresh_popularity_metrics_task_factory import ( + GROUP_ID as REFRESH_POPULARITY_METRICS_GROUP_ID, +) +from data_refresh.refresh_popularity_metrics_task_factory import ( + UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID, + create_refresh_popularity_metrics_task_group, +) +from data_refresh.refresh_view_data_task_factory import ( + UPDATE_DB_VIEW_TASK_ID, + create_refresh_view_data_task, +) logger = logging.getLogger(__name__) - -DATA_REFRESH_POOL = "data_refresh" +REFRESH_MATERIALIZED_VIEW_TASK_ID = UPDATE_DB_VIEW_TASK_ID +# The first task in the refresh_popularity_metrics TaskGroup +REFRESH_POPULARITY_METRICS_TASK_ID = ( + f"{REFRESH_POPULARITY_METRICS_GROUP_ID}" + f".{UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID}" +) -def response_filter_data_refresh(response: Response) -> str: +@provide_session +def _month_check(dag_id: str, session: SASession = None) -> str: """ - Filter for the `trigger_data_refresh` task, used to grab the endpoint needed - to poll for the status of the triggered data refresh. This information will - then be available via XCom in the downstream tasks. - """ - status_check_url = response.json()["status_check"] - return urlparse(status_check_url).path + Checks whether there has been a previous DagRun this month. If so, + returns the task_id for the matview refresh task; else, returns the + task_id for refresh popularity metrics task. + Required Arguments: -# Response check to the `wait_for_completion` Sensor. Processes the response to -# determine whether the task can complete. -def response_check_wait_for_completion(response: Response) -> bool: + dag_id: id of the currently running Dag """ - Response check to the `wait_for_completion` Sensor. Processes the response to - determine whether the task can complete. - """ - data = response.json() - - if data["active"]: - # The data refresh is still running. Poll again later. - return False + # Get the current DagRun + DR = DagRun + current_dagrun = ( + session.query(DR).filter(DR.dag_id == dag_id, DR.state == State.RUNNING) + ).first() + + # If `force_refresh_metrics` has been passed in the dagrun config, then + # immediately return the task_id to refresh popularity metrics without + # doing the month check. + force_refresh_metrics = current_dagrun.conf.get("force_refresh_metrics") + if force_refresh_metrics is not None: + logger.info(f"`force_refresh_metrics` is set to {force_refresh_metrics}.") + return ( + REFRESH_POPULARITY_METRICS_TASK_ID + if force_refresh_metrics + else REFRESH_MATERIALIZED_VIEW_TASK_ID + ) - if data["error"]: - raise AirflowException("Error triggering data refresh.") + # Get the most recent successful dagrun for this Dag + latest_dagrun = ( + session.query(DR) + .filter(DR.dag_id == dag_id, DR.state == State.SUCCESS) + .order_by(DR.start_date.desc()) + ).first() + + # No previous successful dagrun, refresh all popularity data. + if latest_dagrun is None: + return REFRESH_POPULARITY_METRICS_TASK_ID + + # Check if the last dagrun was in the same month as the current run + current_date = current_dagrun.start_date + last_dagrun_date = latest_dagrun.start_date + is_last_dagrun_in_current_month = ( + current_date.month == last_dagrun_date.month + and current_date.year == last_dagrun_date.year + ) - logger.info( - f"Data refresh done with {data['percent_completed']}% \ - completed." + return ( + REFRESH_POPULARITY_METRICS_TASK_ID + if not is_last_dagrun_in_current_month + else REFRESH_MATERIALIZED_VIEW_TASK_ID ) - return True def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequence[str]): """ - This factory method instantiates a DAG that will run the data refresh for - the given `media_type`. - - A data refresh runs for a given media type in the API DB. It imports the - data for that type from the upstream DB in the Catalog, reindexes the data, - and updates and reindex Elasticsearch. - - A data refresh can only be performed for one media type at a time, so the DAG - must also use a Sensor to make sure that no two data refresh tasks run - concurrently. - - It is intended that the data_refresh tasks, or at least the initial - `wait_for_data_refresh` tasks, should be run in a custom pool with 1 worker - slot. This enforces that no two `wait_for_data_refresh` tasks can start - concurrently and enter a race condition. + This factory method instantiates a DAG that will run the popularity calculation and + subsequent data refresh for the given `media_type`. Required Arguments: data_refresh: dataclass containing configuration information for the DAG - external_dag_ids: list of ids of the other data refresh DAGs. This DAG - will not run concurrently with any dependent DAG. + external_dag_ids: list of ids of the other data refresh DAGs. The data refresh step + of this DAG will not run concurrently with the corresponding step + of any dependent DAG. """ default_args = { **DAG_DEFAULT_ARGS, **data_refresh.default_args, - "pool": DATA_REFRESH_POOL, } - poke_interval = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 15)) + dag = DAG( dag_id=data_refresh.dag_id, default_args=default_args, @@ -142,54 +151,40 @@ def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequenc ) with dag: - # Wait to ensure that no other Data Refresh DAGs are running. - wait_for_data_refresh = SingleRunExternalDAGsSensor( - task_id="wait_for_data_refresh", - external_dag_ids=external_dag_ids, - check_existence=True, - dag=dag, - poke_interval=poke_interval, - mode="reschedule", + # Check if this is the first DagRun of the month for this DAG. + month_check = BranchPythonOperator( + task_id="month_check", + python_callable=_month_check, + op_kwargs={ + "dag_id": data_refresh.dag_id, + }, ) - data_refresh_post_data = { - "model": data_refresh.media_type, - "action": "INGEST_UPSTREAM", - } - - # Trigger the refresh on the data refresh server. - trigger_data_refresh = SimpleHttpOperator( - task_id="trigger_data_refresh", - http_conn_id="data_refresh", - endpoint="task", - method="POST", - headers={"Content-Type": "application/json"}, - data=json.dumps(data_refresh_post_data), - response_check=lambda response: response.status_code == 202, - response_filter=response_filter_data_refresh, - dag=dag, + # Refresh underlying popularity tables. This is required infrequently in order + # to update new popularity metrics and constants, so this branch is only taken + # if it is the first run of the month (or when forced). + refresh_popularity_metrics = create_refresh_popularity_metrics_task_group( + data_refresh ) - # Wait for the data refresh to complete. - wait_for_completion = HttpSensor( - task_id="wait_for_completion", - http_conn_id="data_refresh", - endpoint=XCOM_PULL_TEMPLATE.format( - trigger_data_refresh.task_id, "return_value" - ), - method="GET", - response_check=response_check_wait_for_completion, - dag=dag, - mode="reschedule", - poke_interval=poke_interval, - timeout=(60 * 60 * 24 * 3), # 3 days + # Refresh the materialized view. This occurs on all DagRuns and updates + # popularity data for newly ingested records. + refresh_matview = create_refresh_view_data_task(data_refresh) + + # Trigger the actual data refresh on the remote data refresh server, and wait + # for it to complete. + data_refresh_group = create_data_refresh_task_group( + data_refresh, external_dag_ids ) - wait_for_data_refresh >> trigger_data_refresh >> wait_for_completion + # Set up task dependencies + month_check >> [refresh_popularity_metrics, refresh_matview] + refresh_popularity_metrics >> refresh_matview >> data_refresh_group return dag +# Generate a data refresh DAG for each DATA_REFRESH_CONFIG. all_data_refresh_dag_ids = {refresh.dag_id for refresh in DATA_REFRESH_CONFIGS} for data_refresh in DATA_REFRESH_CONFIGS: diff --git a/openverse_catalog/dags/data_refresh/data_refresh_task_factory.py b/openverse_catalog/dags/data_refresh/data_refresh_task_factory.py new file mode 100644 index 00000000000..35724ec2254 --- /dev/null +++ b/openverse_catalog/dags/data_refresh/data_refresh_task_factory.py @@ -0,0 +1,174 @@ +""" +# Data Refresh TaskGroup Factory +This file generates the data refresh TaskGroup using a factory function. +This TaskGroup initiates a data refresh for a given media type and awaits the +success or failure of the refresh. Importantly, it is also configured to +ensure that no two remote data refreshes can run concurrently, as required by +the server. + +A data refresh occurs on the data refresh server in the openverse-api project. +This is a task which imports data from the upstream Catalog database into the +API, copies contents to a new Elasticsearch index, and makes the index "live". +This process is necessary to make new content added to the Catalog by our +provider DAGs available on the frontend. You can read more in the [README]( +https://github.com/WordPress/openverse-api/blob/main/ingestion_server/README.md +) + +The TaskGroup generated by this factory allows us to schedule those refreshes through +Airflow. Since no two refreshes can run simultaneously, all tasks are initially +funneled through a special `data_refresh` pool with a single worker slot. To ensure +that tasks run in an acceptable order (ie the trigger step for one DAG cannot run if a +previously triggered refresh is still running), each DAG has the following +steps: + +1. The `wait_for_data_refresh` step uses a custom Sensor that will wait until +none of the `external_dag_ids` (corresponding to the other data refresh DAGs) +are 'running'. A DAG is considered to be 'running' if it is itself in the +RUNNING state __and__ its own `wait_for_data_refresh` step has completed +successfully. The Sensor suspends itself and frees up the worker slot if +another data refresh DAG is running. + +2. The `trigger_data_refresh` step then triggers the data refresh by POSTing +to the `/task` endpoint on the data refresh server with relevant data. A +successful response will include the `status_check` url used to check on the +status of the refresh, which is passed on to the next task via XCom. + +3. Finally the `wait_for_data_refresh` task waits for the data refresh to be +complete by polling the `status_url`. Note this task does not need to be +able to suspend itself and free the worker slot, because we want to lock the +entire pool on waiting for a particular data refresh to run. + +You can find more background information on this process in the following +issues and related PRs: + +- [[Feature] Data refresh orchestration DAG]( +https://github.com/WordPress/openverse-catalog/issues/353) +""" +import json +import logging +import os +from typing import Sequence +from urllib.parse import urlparse + +from airflow.exceptions import AirflowException +from airflow.providers.http.operators.http import SimpleHttpOperator +from airflow.providers.http.sensors.http import HttpSensor +from airflow.utils.task_group import TaskGroup +from common.constants import XCOM_PULL_TEMPLATE +from common.sensors.single_run_external_dags_sensor import SingleRunExternalDAGsSensor +from data_refresh.data_refresh_types import DataRefresh +from requests import Response + + +logger = logging.getLogger(__name__) + + +DATA_REFRESH_POOL = "data_refresh" + + +def response_filter_data_refresh(response: Response) -> str: + """ + Filter for the `trigger_data_refresh` task, used to grab the endpoint needed + to poll for the status of the triggered data refresh. This information will + then be available via XCom in the downstream tasks. + """ + status_check_url = response.json()["status_check"] + return urlparse(status_check_url).path + + +def response_check_wait_for_completion(response: Response) -> bool: + """ + Response check to the `wait_for_completion` Sensor. Processes the response to + determine whether the task can complete. + """ + data = response.json() + + if data["active"]: + # The data refresh is still running. Poll again later. + return False + + if data["error"]: + raise AirflowException("Error triggering data refresh.") + + logger.info( + f"Data refresh done with {data['percent_completed']}% \ + completed." + ) + return True + + +def create_data_refresh_task_group( + data_refresh: DataRefresh, external_dag_ids: Sequence[str] +): + """ + This factory method instantiates a DAG that will run the data refresh for + the given `media_type`. + + A data refresh runs for a given media type in the API DB. It refreshes popularity + data for that type, imports the data from the upstream DB in the Catalog, reindexes + the data, and updates and reindex Elasticsearch. + + A data refresh can only be performed for one media type at a time, so the DAG + must also use a Sensor to make sure that no two data refresh tasks run + concurrently. + + It is intended that the data_refresh tasks, or at least the initial + `wait_for_data_refresh` tasks, should be run in a custom pool with 1 worker + slot. This enforces that no two `wait_for_data_refresh` tasks can start + concurrently and enter a race condition. + + Required Arguments: + + data_refresh: dataclass containing configuration information for the + DAG + external_dag_ids: list of ids of the other data refresh DAGs. This DAG + will not run concurrently with any dependent DAG. + """ + + poke_interval = int(os.getenv("DATA_REFRESH_POKE_INTERVAL", 60 * 15)) + + with TaskGroup(group_id="data_refresh") as data_refresh_group: + # Wait to ensure that no other Data Refresh DAGs are running. + wait_for_data_refresh = SingleRunExternalDAGsSensor( + task_id="wait_for_data_refresh", + external_dag_ids=external_dag_ids, + check_existence=True, + poke_interval=poke_interval, + mode="reschedule", + pool=DATA_REFRESH_POOL, + ) + + data_refresh_post_data = { + "model": data_refresh.media_type, + "action": "INGEST_UPSTREAM", + } + + # Trigger the refresh on the data refresh server. + trigger_data_refresh = SimpleHttpOperator( + task_id="trigger_data_refresh", + http_conn_id="data_refresh", + endpoint="task", + method="POST", + headers={"Content-Type": "application/json"}, + data=json.dumps(data_refresh_post_data), + response_check=lambda response: response.status_code == 202, + response_filter=response_filter_data_refresh, + ) + + # Wait for the data refresh to complete. + wait_for_completion = HttpSensor( + task_id="wait_for_completion", + http_conn_id="data_refresh", + endpoint=XCOM_PULL_TEMPLATE.format( + trigger_data_refresh.task_id, "return_value" + ), + method="GET", + response_check=response_check_wait_for_completion, + mode="reschedule", + poke_interval=poke_interval, + timeout=data_refresh.data_refresh_timeout, + ) + + wait_for_data_refresh >> trigger_data_refresh >> wait_for_completion + + return data_refresh_group diff --git a/openverse_catalog/dags/data_refresh/data_refresh_types.py b/openverse_catalog/dags/data_refresh/data_refresh_types.py index 7bcbb3f0a74..b4fce771f5a 100644 --- a/openverse_catalog/dags/data_refresh/data_refresh_types.py +++ b/openverse_catalog/dags/data_refresh/data_refresh_types.py @@ -17,28 +17,34 @@ class DataRefresh: Required Constructor Arguments: - media_type: string describing the media type to be refreshed. + media_type: str describing the media type to be refreshed. Optional Constructor Arguments: - default_args: dictionary which is passed to the airflow.dag.DAG - __init__ method. - start_date: datetime.datetime giving the - first valid execution_date of the DAG. - schedule_interval: string giving the schedule on which the DAG should - be run. Passed to the airflow.dag.DAG __init__ - method. - execution_timeout: datetime.timedelta giving the amount of time a given data - pull may take. - doc_md: string which should be used for the DAG's documentation markdown + default_args: dictionary which is passed to the airflow.dag.DAG + __init__ method. + start_date: datetime.datetime giving the + first valid execution_date of the DAG. + schedule_interval: string giving the schedule on which the DAG should + be run. Passed to the airflow.dag.DAG __init__ + method. + data_refresh_timeout: int giving the amount of time in seconds a given + data pull may take. + refresh_metrics_timeout: timedelta expressing amount of time the refresh + popularity metrics tasks may take. + refresh_matview_timeout: timedelta expressing amount of time the refresh + of the popularity matview may take. + doc_md: str used for the DAG's documentation markdown """ dag_id: str = field(init=False) media_type: str start_date: datetime = datetime(2020, 1, 1) - execution_timeout: timedelta = timedelta(hours=24) schedule_interval: Optional[str] = "@weekly" default_args: Optional[Dict] = field(default_factory=dict) + data_refresh_timeout: int = 24 * 60 * 60 # 1 day + refresh_metrics_timeout: timedelta = timedelta(hours=1) + refresh_matview_timeout: timedelta = timedelta(hours=1) def __post_init__(self): self.dag_id = f"{self.media_type}_data_refresh" @@ -47,7 +53,9 @@ def __post_init__(self): DATA_REFRESH_CONFIGS = [ DataRefresh( media_type="image", - execution_timeout=timedelta(days=3), + data_refresh_timeout=3 * 24 * 60 * 60, # 3 days, + refresh_metrics_timeout=timedelta(hours=24), + refresh_matview_timeout=timedelta(hours=24), ), DataRefresh(media_type="audio"), ] diff --git a/openverse_catalog/dags/data_refresh/refresh_popularity_metrics_task_factory.py b/openverse_catalog/dags/data_refresh/refresh_popularity_metrics_task_factory.py new file mode 100644 index 00000000000..9554345bd85 --- /dev/null +++ b/openverse_catalog/dags/data_refresh/refresh_popularity_metrics_task_factory.py @@ -0,0 +1,61 @@ +""" +# Refresh Popularity Metrics TaskGroup Factory +This file generates a TaskGroup that refreshes the underlying popularity DB +tables, using a factory function. + +This step updates any changes to popularity metrics, and recalculates the +popularity constants. It should be run at least once every month, or whenever +a new popularity metric is added. Scheduling is handled in the parent data +refresh DAG. +""" +from airflow.operators.python import PythonOperator +from airflow.utils.task_group import TaskGroup +from common.constants import POSTGRES_CONN_ID +from common.popularity import sql +from data_refresh.data_refresh_types import DataRefresh + + +GROUP_ID = "refresh_popularity_metrics_and_constants" +UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID = "update_media_popularity_metrics_table" +UPDATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID = "update_media_popularity_constants_view" + + +def create_refresh_popularity_metrics_task_group(data_refresh: DataRefresh): + """ + This factory method instantiates a TaskGroup that will update the popularity + DB tables for the given media type, including percentiles and popularity + metrics. + + Required Arguments: + + data_refresh: configuration data for the data refresh + """ + media_type = data_refresh.media_type + execution_timeout = data_refresh.refresh_metrics_timeout + + with TaskGroup(group_id=GROUP_ID) as refresh_all_popularity_data: + update_metrics = PythonOperator( + task_id=UPDATE_MEDIA_POPULARITY_METRICS_TASK_ID, + python_callable=sql.update_media_popularity_metrics, + op_args=[POSTGRES_CONN_ID, media_type], + execution_timeout=execution_timeout, + doc=( + "Updates the popularity metrics table, adding any new " + "popularity metrics and updating the configured percentile." + ), + ) + + update_constants = PythonOperator( + task_id=UPDATE_MEDIA_POPULARITY_CONSTANTS_TASK_ID, + python_callable=sql.update_media_popularity_constants, + op_args=[POSTGRES_CONN_ID, media_type], + execution_timeout=execution_timeout, + doc=( + "Updates the popularity constants view. This completely " + "recalculates the popularity constants for each provider." + ), + ) + + update_metrics >> update_constants + + return refresh_all_popularity_data diff --git a/openverse_catalog/dags/data_refresh/refresh_view_data_task_factory.py b/openverse_catalog/dags/data_refresh/refresh_view_data_task_factory.py new file mode 100644 index 00000000000..a6ed03e545d --- /dev/null +++ b/openverse_catalog/dags/data_refresh/refresh_view_data_task_factory.py @@ -0,0 +1,43 @@ +""" +# Refresh Materialized View Task Factory +This file generates a Task that refreshes the materialized view for a +given media type, using a factory function. + +The task refreshes the materialized view, but not the underlying tables. This +means that the only effect is to add or update data (including popularity data) +for records which have been ingested since the last time the view was +refreshed. + +This should be run every time before a data refresh is triggered. +""" +from airflow.operators.python import PythonOperator +from airflow.utils.trigger_rule import TriggerRule +from common.constants import POSTGRES_CONN_ID +from common.popularity import sql +from data_refresh.data_refresh_types import DataRefresh + + +UPDATE_DB_VIEW_TASK_ID = "update_materialized_popularity_view" + + +def create_refresh_view_data_task(data_refresh: DataRefresh): + """ + The task refreshes the materialized view for the given media type. The view collates + popularity data for each record. Refreshing has the effect of adding popularity data + for records that were ingested since the last time the view was refreshed, and + updating popularity data for existing records. + + Required Arguments: + + data_refresh: configuration information for the data refresh + """ + refresh_matview = PythonOperator( + task_id=UPDATE_DB_VIEW_TASK_ID, + python_callable=sql.update_db_view, + op_args=[POSTGRES_CONN_ID, data_refresh.media_type], + execution_timeout=data_refresh.refresh_matview_timeout, + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + doc_md=create_refresh_view_data_task.__doc__, + ) + + return refresh_matview diff --git a/openverse_catalog/dags/database/refresh_all_audio_popularity_data.py b/openverse_catalog/dags/database/refresh_all_audio_popularity_data.py deleted file mode 100644 index eb062d7245b..00000000000 --- a/openverse_catalog/dags/database/refresh_all_audio_popularity_data.py +++ /dev/null @@ -1,69 +0,0 @@ -""" -This file defines an Apache Airflow DAG that completely recalculates all -popularity data, including the percentile values, and also adding any -new popularity metrics. - -This should be run at least once every 6 months, or whenever a new -popularity metric is added. -""" -import os -from datetime import datetime, timedelta - -from airflow import DAG -from common import slack -from common.popularity import operators - - -DAG_ID = "refresh_all_audio_popularity_data" -DB_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing") -MAX_ACTIVE_TASKS = 1 -SCHEDULE_CRON = "@monthly" - -DAG_DEFAULT_ARGS = { - "owner": "data-eng-admin", - "depends_on_past": False, - "start_date": datetime(2020, 6, 15), - "email_on_retry": False, - "retries": 2, - "retry_delay": timedelta(seconds=3600), - "on_failure_callback": slack.on_failure_callback, -} - - -def create_dag( - dag_id=DAG_ID, - args=DAG_DEFAULT_ARGS, - max_active_tasks=MAX_ACTIVE_TASKS, - max_active_runs=MAX_ACTIVE_TASKS, - schedule_cron=SCHEDULE_CRON, - postgres_conn_id=DB_CONN_ID, -): - dag = DAG( - dag_id=dag_id, - default_args=args, - max_active_tasks=max_active_tasks, - max_active_runs=max_active_runs, - schedule_interval=schedule_cron, - catchup=False, - tags=["database"], - ) - with dag: - update_metrics = operators.update_media_popularity_metrics( - postgres_conn_id, - media_type="audio", - ) - update_constants = operators.update_media_popularity_constants( - postgres_conn_id, - media_type="audio", - ) - update_image_view = operators.update_db_view( - postgres_conn_id, - media_type="audio", - ) - - (update_metrics >> update_constants >> update_image_view) - - return dag - - -globals()[DAG_ID] = create_dag() diff --git a/openverse_catalog/dags/database/refresh_all_image_popularity_data.py b/openverse_catalog/dags/database/refresh_all_image_popularity_data.py deleted file mode 100644 index be9bde483d8..00000000000 --- a/openverse_catalog/dags/database/refresh_all_image_popularity_data.py +++ /dev/null @@ -1,60 +0,0 @@ -""" -This file defines an Apache Airflow DAG that completely recalculates all -popularity data, including the percentile values, and also adding any -new popularity metrics. - -This should be run at least once every 6 months, or whenever a new -popularity metric is added. -""" -import os -from datetime import datetime, timedelta - -from airflow import DAG -from common import slack -from common.popularity import operators - - -DAG_ID = "refresh_all_image_popularity_data" -DB_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing") -MAX_ACTIVE_TASKS = 1 -SCHEDULE_CRON = "@monthly" - -DAG_DEFAULT_ARGS = { - "owner": "data-eng-admin", - "depends_on_past": False, - "start_date": datetime(2020, 6, 15), - "email_on_retry": False, - "retries": 2, - "retry_delay": timedelta(seconds=3600), - "on_failure_callback": slack.on_failure_callback, -} - - -def create_dag( - dag_id=DAG_ID, - args=DAG_DEFAULT_ARGS, - max_active_tasks=MAX_ACTIVE_TASKS, - max_active_runs=MAX_ACTIVE_TASKS, - schedule_cron=SCHEDULE_CRON, - postgres_conn_id=DB_CONN_ID, -): - dag = DAG( - dag_id=dag_id, - default_args=args, - max_active_tasks=max_active_tasks, - max_active_runs=max_active_runs, - schedule_interval=schedule_cron, - catchup=False, - tags=["database"], - ) - with dag: - update_metrics = operators.update_media_popularity_metrics(postgres_conn_id) - update_constants = operators.update_media_popularity_constants(postgres_conn_id) - update_image_view = operators.update_db_view(postgres_conn_id) - - (update_metrics >> update_constants >> update_image_view) - - return dag - - -globals()[DAG_ID] = create_dag() diff --git a/openverse_catalog/dags/database/refresh_audio_view_data.py b/openverse_catalog/dags/database/refresh_audio_view_data.py deleted file mode 100644 index ed4292c8eb4..00000000000 --- a/openverse_catalog/dags/database/refresh_audio_view_data.py +++ /dev/null @@ -1,59 +0,0 @@ -""" -This file defines an Apache Airflow DAG that refreshes the data in -image_view, but not the underlying tables. This means the only effect -of this DAG is to add or update data (including popularity data) for -images which have been ingested since the last time the view was -refreshed. - -This should be run once per day. -""" -import os -from datetime import datetime, timedelta - -from airflow import DAG -from common import slack -from common.popularity import operators - - -DAG_ID = "refresh_audio_view_data" -DB_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing") -MAX_ACTIVE_TASKS = 1 -# We don't run on the first of the month, since the -# `refresh_all_audio_popularity_data` DAG should run on that day. -SCHEDULE_CRON = "0 0 2-31 * *" - -DAG_DEFAULT_ARGS = { - "owner": "data-eng-admin", - "depends_on_past": False, - "start_date": datetime(2020, 6, 15), - "email_on_retry": False, - "retries": 2, - "retry_delay": timedelta(seconds=3600), - "on_failure_callback": slack.on_failure_callback, -} - - -def create_dag( - dag_id=DAG_ID, - args=DAG_DEFAULT_ARGS, - max_active_tasks=MAX_ACTIVE_TASKS, - max_active_runs=MAX_ACTIVE_TASKS, - schedule_cron=SCHEDULE_CRON, - postgres_conn_id=DB_CONN_ID, -): - dag = DAG( - dag_id=dag_id, - default_args=args, - max_active_tasks=max_active_tasks, - max_active_runs=max_active_runs, - schedule_interval=schedule_cron, - catchup=False, - tags=["database"], - ) - with dag: - operators.update_db_view(postgres_conn_id, media_type="audio") - - return dag - - -globals()[DAG_ID] = create_dag() diff --git a/openverse_catalog/dags/database/refresh_image_view_data.py b/openverse_catalog/dags/database/refresh_image_view_data.py deleted file mode 100644 index fe25a87607a..00000000000 --- a/openverse_catalog/dags/database/refresh_image_view_data.py +++ /dev/null @@ -1,59 +0,0 @@ -""" -This file defines an Apache Airflow DAG that refreshes the data in -image_view, but not the underlying tables. This means the only effect -of this DAG is to add or update data (including popularity data) for -images which have been ingested since the last time the view was -refreshed. - -This should be run once per day. -""" -import os -from datetime import datetime, timedelta - -from airflow import DAG -from common import slack -from common.popularity import operators - - -DAG_ID = "refresh_image_view_data" -DB_CONN_ID = os.getenv("OPENLEDGER_CONN_ID", "postgres_openledger_testing") -MAX_ACTIVE_TASKS = 1 -# We don't run on the first of the month, since the -# `refresh_all_image_popularity_data` DAG should run on that day. -SCHEDULE_CRON = "0 0 2-31 * *" - -DAG_DEFAULT_ARGS = { - "owner": "data-eng-admin", - "depends_on_past": False, - "start_date": datetime(2020, 6, 15), - "email_on_retry": False, - "retries": 2, - "retry_delay": timedelta(seconds=3600), - "on_failure_callback": slack.on_failure_callback, -} - - -def create_dag( - dag_id=DAG_ID, - args=DAG_DEFAULT_ARGS, - max_active_tasks=MAX_ACTIVE_TASKS, - max_active_runs=MAX_ACTIVE_TASKS, - schedule_cron=SCHEDULE_CRON, - postgres_conn_id=DB_CONN_ID, -): - dag = DAG( - dag_id=dag_id, - default_args=args, - max_active_tasks=max_active_tasks, - max_active_runs=max_active_runs, - schedule_interval=schedule_cron, - catchup=False, - tags=["database"], - ) - with dag: - operators.update_db_view(postgres_conn_id) - - return dag - - -globals()[DAG_ID] = create_dag() diff --git a/tests/dags/data_refresh/test_dag_factory.py b/tests/dags/data_refresh/test_dag_factory.py new file mode 100644 index 00000000000..eba1e2f420e --- /dev/null +++ b/tests/dags/data_refresh/test_dag_factory.py @@ -0,0 +1,119 @@ +import pytest +from airflow.models import DagRun +from airflow.models.dag import DAG +from airflow.utils.session import create_session +from airflow.utils.state import State +from airflow.utils.timezone import datetime +from airflow.utils.types import DagRunType +from data_refresh import dag_factory +from data_refresh.dag_factory import ( + REFRESH_MATERIALIZED_VIEW_TASK_ID, + REFRESH_POPULARITY_METRICS_TASK_ID, +) + + +TEST_DAG_ID = "data_refresh_dag_factory_test_dag" +TEST_DAG = DAG(TEST_DAG_ID, default_args={"owner": "airflow"}) + + +@pytest.fixture(autouse=True) +def clean_db(): + with create_session() as session: + session.query(DagRun).filter(DagRun.dag_id == TEST_DAG_ID).delete() + + +def _create_dagrun(start_date, dag_state, conf={}): + return TEST_DAG.create_dagrun( + start_date=start_date, + execution_date=start_date, + data_interval=(start_date, start_date), + state=dag_state, + run_type=DagRunType.MANUAL, + conf=conf, + ) + + +@pytest.mark.parametrize( + "force_refresh_metrics, last_dagrun_date, today_date, expected_task_id", + [ + # Last dagrun was in the same month + ( + None, + datetime(2022, 3, 1, 0, 0, 0), + datetime(2022, 3, 2, 0, 0, 0), + REFRESH_MATERIALIZED_VIEW_TASK_ID, + ), + # Last dagrun was in the same month, different year + ( + None, + datetime(2021, 3, 1, 0, 0, 0), + datetime(2022, 3, 2, 0, 0, 0), + REFRESH_POPULARITY_METRICS_TASK_ID, + ), + # Last dagrun was in a previous month + ( + None, + datetime(2022, 2, 1, 0, 0, 0), + datetime(2022, 3, 2, 0, 0, 0), + REFRESH_POPULARITY_METRICS_TASK_ID, + ), + # `force_refresh_metrics` is turned on + # Last dagrun was in the same month + ( + True, + datetime(2022, 3, 1, 0, 0, 0), + datetime(2022, 3, 2, 0, 0, 0), + REFRESH_POPULARITY_METRICS_TASK_ID, + ), + # Last dagrun was in a previous month + ( + True, + datetime(2022, 2, 1, 0, 0, 0), + datetime(2022, 3, 2, 0, 0, 0), + REFRESH_POPULARITY_METRICS_TASK_ID, + ), + # `force_refresh_metrics` is explicitly false + # Last dagrun was in the same month + ( + False, + datetime(2022, 3, 1, 0, 0, 0), + datetime(2022, 3, 2, 0, 0, 0), + REFRESH_MATERIALIZED_VIEW_TASK_ID, + ), + # Last dagrun was in a previous month + ( + False, + datetime(2022, 2, 1, 0, 0, 0), + datetime(2022, 3, 2, 0, 0, 0), + REFRESH_MATERIALIZED_VIEW_TASK_ID, + ), + ], +) +def test_month_check_returns_correct_task_id( + force_refresh_metrics, last_dagrun_date, today_date, expected_task_id +): + # Create latest dagrun + _create_dagrun(last_dagrun_date, State.SUCCESS) + # Create current dagrun + _create_dagrun( + today_date, State.RUNNING, {"force_refresh_metrics": force_refresh_metrics} + ) + + next_task_id = dag_factory._month_check(TEST_DAG.dag_id) + assert next_task_id == expected_task_id + + +def test_month_check_ignores_failed_dagruns(): + # Create running dagrun + _create_dagrun(datetime(2022, 3, 2, 0, 0, 0), State.RUNNING) + + # Create previous dagrun in same month, but with failed state + _create_dagrun(datetime(2022, 3, 1, 0, 0, 0), State.FAILED) + + # Create successful dagrun in previous month + _create_dagrun(datetime(2022, 2, 1, 0, 0, 0), State.SUCCESS) + + # Even though there was a previous run this month, it failed. The last + # successful run was last month, so we should refresh metrics. + next_task_id = dag_factory._month_check(TEST_DAG.dag_id) + assert next_task_id == REFRESH_POPULARITY_METRICS_TASK_ID diff --git a/tests/dags/test_dag_parsing.py b/tests/dags/test_dag_parsing.py index 28a9dbe1d00..8d3095083ec 100644 --- a/tests/dags/test_dag_parsing.py +++ b/tests/dags/test_dag_parsing.py @@ -41,8 +41,6 @@ "commoncrawl/commoncrawl_etl.py", "database/loader_workflow.py", "database/recreate_image_popularity_calculation.py", - "database/refresh_all_image_popularity_data.py", - "database/refresh_image_view_data.py", "oauth2/authorize_dag.py", "oauth2/token_refresh_dag.py", ]