Skip to content
This repository was archived by the owner on Aug 4, 2023. It is now read-only.

Commit 4a9c008

Browse files
authored
Update reingestion workflows to load and report data (#618)
* Refactor Wikimedia Commons to use ProviderDataIngester * Add main function * Refactor Wikimedia Commons to use ProviderDataIngester * Refactor provider_dag_factory to separate out the ingestion tasks from the reporting * Append the day_shift to the end of ingestion tasks if not zero * Update reporting task to sum values if a list is provided * Fix reporting tests * Update MediaStores to take a date and append to end of tsv filename * Initialize MediaStores with date in ProviderDataIngester * Refactor ingestion_workflow dag factory to include load/reporting steps * Update ingestion dag factory to accept ProviderDataIngester classes * Update Wikimedia ingestion workflow configuration * Update reingestion workflow tests * Add pre/post ingestion tasks and load timeouts to reingestion flows * Add date_partition_for_prefix macro * Fix tests * Remove changes from rebase * Rename ingestion to reingestion * Update types in reporting module * Refactor args and types There were many methods that had large argument lists that were simply being passed through, making documentation very messy. Updated to pass the entire workflow conf object through. Also updated some docstrings and refined the way defaults are set on the workflow confs. * Update reporting tests * Report DAG id instead of provider name, pass through dates * Add disclaimer about aggregate duration * Update dag parsing tests * Temporarily change reingestion schedule to weekly * Update DAG docs * Update test * Remove TODO * Update variable name to clarify that it is a boolean * Remove TODO * Adjust function order * Add tags for media types * Improve variable names * Fix tests * Fix test import * Partition by reingestion date in tsv filenames Avoids collisions on tsv filenames when reingestion runs for two different dates at the same time. - Removes the old impelementation, which appended ingestion date to the actual filename - Instead partitions by the reingestion date * Fix and update tsv partition tests * Make task names more verbose * Pluralize 'tasks' in slack message
1 parent fc62774 commit 4a9c008

17 files changed

+675
-374
lines changed

DAGs.md

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ The following are DAGs grouped by their primary tag:
102102

103103
| DAG ID | Schedule Interval |
104104
| --- | --- |
105-
| `europeana_ingestion_workflow` | `@daily` |
106-
| `flickr_ingestion_workflow` | `@daily` |
107-
| `wikimedia_ingestion_workflow` | `@daily` |
105+
| [`europeana_reingestion_workflow`](#europeana_reingestion_workflow) | `@weekly` |
106+
| [`flickr_reingestion_workflow`](#flickr_reingestion_workflow) | `@weekly` |
107+
| [`wikimedia_reingestion_workflow`](#wikimedia_reingestion_workflow) | `@weekly` |
108108

109109

110110
# DAG documentation
@@ -114,7 +114,9 @@ The following is documentation associated with each DAG (where available):
114114
1. [`airflow_log_cleanup`](#airflow_log_cleanup)
115115
1. [`audio_data_refresh`](#audio_data_refresh)
116116
1. [`check_silenced_dags`](#check_silenced_dags)
117+
1. [`europeana_reingestion_workflow`](#europeana_reingestion_workflow)
117118
1. [`europeana_workflow`](#europeana_workflow)
119+
1. [`flickr_reingestion_workflow`](#flickr_reingestion_workflow)
118120
1. [`flickr_workflow`](#flickr_workflow)
119121
1. [`freesound_workflow`](#freesound_workflow)
120122
1. [`image_data_refresh`](#image_data_refresh)
@@ -133,6 +135,7 @@ The following is documentation associated with each DAG (where available):
133135
1. [`tsv_to_postgres_loader`](#tsv_to_postgres_loader)
134136
1. [`walters_workflow`](#walters_workflow)
135137
1. [`wikimedia_commons_workflow`](#wikimedia_commons_workflow)
138+
1. [`wikimedia_reingestion_workflow`](#wikimedia_reingestion_workflow)
136139
1. [`wordpress_workflow`](#wordpress_workflow)
137140

138141

@@ -215,6 +218,20 @@ The DAG runs weekly.
215218

216219

217220

221+
## `europeana_reingestion_workflow`
222+
223+
224+
Content Provider: Europeana
225+
226+
ETL Process: Use the API to identify all CC licensed images.
227+
228+
Output: TSV file containing the images and the
229+
respective meta-data.
230+
231+
Notes: https://www.europeana.eu/api/v2/search.json
232+
233+
234+
218235
## `europeana_workflow`
219236

220237

@@ -229,6 +246,21 @@ Notes: https://www.europeana.eu/api/v2/search.json
229246

230247

231248

249+
## `flickr_reingestion_workflow`
250+
251+
252+
Content Provider: Flickr
253+
254+
ETL Process: Use the API to identify all CC licensed images.
255+
256+
Output: TSV file containing the images and the
257+
respective meta-data.
258+
259+
Notes: https://www.flickr.com/help/terms/api
260+
Rate limit: 3600 requests per hour.
261+
262+
263+
232264
## `flickr_workflow`
233265

234266

@@ -599,6 +631,21 @@ Notes: https://commons.wikimedia.org/wiki/API:Main_page
599631

600632

601633

634+
## `wikimedia_reingestion_workflow`
635+
636+
637+
Content Provider: Wikimedia Commons
638+
639+
ETL Process: Use the API to identify all CC-licensed images.
640+
641+
Output: TSV file containing the image, the respective
642+
meta-data.
643+
644+
Notes: https://commons.wikimedia.org/wiki/API:Main_page
645+
No rate limit specified.
646+
647+
648+
602649
## `wordpress_workflow`
603650

604651

openverse_catalog/dags/common/helpers.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ class IngestionInput(NamedTuple):
66
repeats: int
77

88

9-
def get_reingestion_day_list_list(inputs: List[IngestionInput]):
9+
def get_partitioned_reingestion_days(inputs: List[IngestionInput]):
1010
"""
1111
This method calculates day-shift lists for Provider API workflows.
1212
1313
The input should be a list of pairs of integers:
1414
15-
`get_reingestion_day_list_list((x_0, y_0), ..., (x_n, y_n))`
15+
`get_partitioned_reingestion_days((x_0, y_0), ..., (x_n, y_n))`
1616
1717
The return will be a list of lists of integers. The zeroth inner
1818
list will be a list of integers counting by x_0, of length y_0. The
@@ -23,7 +23,7 @@ def get_reingestion_day_list_list(inputs: List[IngestionInput]):
2323
list.
2424
2525
For example,
26-
get_reingestion_day_list_list((1, 2), (2, 3), (3, 2))
26+
get_partitioned_reingestion_days((1, 2), (2, 3), (3, 2))
2727
returns
2828
[[1, 2], [4, 6, 8], [11, 14]]
2929
"""

openverse_catalog/dags/common/loader/reporting.py

Lines changed: 64 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import annotations
22

33
import logging
4-
from typing import NamedTuple, Optional
4+
from typing import NamedTuple, Optional, Sequence
55

66
from common.slack import send_message
77

@@ -26,11 +26,24 @@ class RecordMetrics(NamedTuple):
2626
foreign_id_dup: Optional[int]
2727
url_dup: Optional[int]
2828

29+
def _add_counts(self, a, b):
30+
return (a or 0) + (b or 0)
31+
32+
def __add__(self, other):
33+
if other is None:
34+
return self
35+
return RecordMetrics(
36+
self._add_counts(self.upserted, other.upserted),
37+
self._add_counts(self.missing_columns, other.missing_columns),
38+
self._add_counts(self.foreign_id_dup, other.foreign_id_dup),
39+
self._add_counts(self.url_dup, other.url_dup),
40+
)
41+
2942

3043
MediaTypeRecordMetrics = dict[str, RecordMetrics]
3144

3245

33-
def humanize_time_duration(seconds: float) -> str:
46+
def humanize_time_duration(seconds: float | int) -> str:
3447
if seconds == 0:
3548
return "inf"
3649
elif seconds < 1:
@@ -43,10 +56,39 @@ def humanize_time_duration(seconds: float) -> str:
4356
return ", ".join(parts)
4457

4558

59+
def clean_duration(duration: float | list[float]):
60+
# If a list of duration values is provided, get the sum of all non-None values
61+
if isinstance(duration, list):
62+
duration = sum([x for x in duration if x])
63+
64+
# Truncate the duration value if it's provided
65+
if isinstance(duration, float) or isinstance(duration, int):
66+
duration = humanize_time_duration(duration)
67+
68+
return duration
69+
70+
71+
def clean_record_counts(
72+
record_counts_by_media_type: MediaTypeRecordMetrics | list[MediaTypeRecordMetrics],
73+
media_types: Sequence[str],
74+
):
75+
# If a list of record_counts dicts is provided, sum all of the individual values
76+
if isinstance(record_counts_by_media_type, list):
77+
return {
78+
media_type: sum(
79+
[x[media_type] for x in record_counts_by_media_type],
80+
RecordMetrics(0, 0, 0, 0),
81+
)
82+
for media_type in media_types
83+
}
84+
return record_counts_by_media_type
85+
86+
4687
def report_completion(
47-
provider_name: str,
48-
duration: float | str | None,
49-
record_counts_by_media_type: MediaTypeRecordMetrics,
88+
dag_id: str,
89+
media_types: Sequence[str],
90+
duration: float | str | list[float] | None,
91+
record_counts_by_media_type: MediaTypeRecordMetrics | list[MediaTypeRecordMetrics],
5092
dated: bool = False,
5193
date_range_start: str | None = None,
5294
date_range_end: str | None = None,
@@ -72,9 +114,12 @@ def report_completion(
72114
- `date_range`: The range of time this ingestion covers. If the ingestion covers
73115
the entire provided dataset, "all" is provided
74116
"""
75-
# Truncate the duration value if it's provided
76-
if isinstance(duration, float):
77-
duration = humanize_time_duration(duration)
117+
is_aggregate_duration = isinstance(duration, list)
118+
119+
duration = clean_duration(duration)
120+
record_counts_by_media_type = clean_record_counts(
121+
record_counts_by_media_type, media_types
122+
)
78123

79124
# List record count per media type
80125
media_type_reports = ""
@@ -104,10 +149,19 @@ def report_completion(
104149

105150
# Collect data into a single message
106151
message = f"""
107-
*Provider*: `{provider_name}`
152+
*DAG*: `{dag_id}`
108153
*Date range*: {date_range}
109-
*Duration of data pull task*: {duration or '_No data_'}
154+
*Duration of data pull tasks*: {duration or '_No data_'}
110155
*Number of records upserted per media type*:
111156
{media_type_reports}"""
157+
158+
if is_aggregate_duration:
159+
# Add disclaimer about duration for aggregate data
160+
message += (
161+
"\n_Duration is the sum of the duration for each data pull task."
162+
" It does not include loading time and does not account for data"
163+
" pulls that may happen concurrently."
164+
)
165+
112166
send_message(message, username="Airflow DAG Load Data Complete")
113167
return message

openverse_catalog/dags/common/storage/audio.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ class AudioStore(MediaStore):
1818
1919
Optional init arguments:
2020
provider: String marking the provider in the `audio` table of the DB.
21+
date: Date String in the form YYYY-MM-DD. This is the date for
22+
which data is being stored. If provided, it will be appended to
23+
the tsv filename.
2124
output_file: String giving a temporary .tsv filename (*not* the
2225
full path) where the audio info should be stored.
2326
output_dir: String giving a path where `output_file` should be placed.

openverse_catalog/dags/common/storage/image.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ class ImageStore(MediaStore):
1818
1919
Optional init arguments:
2020
provider: String marking the provider in the `image` table of the DB.
21+
date: Date String in the form YYYY-MM-DD. This is the date for
22+
which data is being stored. If provided, it will be appended to
23+
the tsv filename.
2124
output_file: String giving a temporary .tsv filename (*not* the
2225
full path) where the image info should be stored.
2326
output_dir: String giving a path where `output_file` should be placed.

openverse_catalog/dags/common/storage/media.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ class MediaStore(metaclass=abc.ABCMeta):
4646
Optional init arguments:
4747
provider: String marking the provider in the `media`
4848
(`image`, `audio` etc) table of the DB.
49+
date: Date String in the form YYYY-MM-DD. This is the date for
50+
which data is being stored. If provided, it will be appended to
51+
the tsv filename.
4952
output_file: String giving a temporary .tsv filename (*not* the
5053
full path) where the media info should be stored.
5154
output_dir: String giving a path where `output_file` should be placed.
@@ -66,9 +69,7 @@ def __init__(
6669
self.provider = provider
6770
self.buffer_length = buffer_length
6871
self.output_path = self._initialize_output_path(
69-
output_dir,
70-
output_file,
71-
provider,
72+
output_dir, output_file, provider
7273
)
7374
self.columns = None
7475
self._media_buffer = []
@@ -158,7 +159,7 @@ def _initialize_output_path(
158159
self,
159160
output_dir: Optional[str],
160161
output_file: Optional[str],
161-
provider: str,
162+
provider: Optional[str],
162163
version: Optional[str] = None,
163164
) -> str:
164165
"""Creates the path for the tsv file.

openverse_catalog/dags/providers/factory_utils.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,9 @@ def pull_media_wrapper(
147147

148148

149149
def date_partition_for_prefix(
150-
schedule_interval: str | None, logical_date: datetime
150+
schedule_interval: str | None,
151+
logical_date: datetime,
152+
reingestion_date: datetime,
151153
) -> str:
152154
"""
153155
Given a schedule interval and the logical date for a DAG run, determine an
@@ -158,6 +160,14 @@ def date_partition_for_prefix(
158160
- Hourly -> `year=YYYY/month=MM/day=DD`
159161
- Daily -> `year=YYYY/month=MM`
160162
- None/yearly/monthly/weekly/other -> `year=YYYY`
163+
164+
If a reingestion_date is supplied, it is further partitioned by the reingestion
165+
date itself to avoid filename collisions.
166+
167+
Example:
168+
- Hourly -> `year=YYYY/month=MM/day=DD/reingestion=YYYY-MM-DD`
169+
- Daily -> `year=YYYY/month=MM/reingestion=YYYY-MM-DD`
170+
- None/yearly/monthly/weekly/other -> `year=YYYY/reingestion=YYYY-MM-DD`
161171
"""
162172
hourly_airflow = "@hourly"
163173
hourly_cron = cron_presets[hourly_airflow]
@@ -175,4 +185,8 @@ def date_partition_for_prefix(
175185
if schedule_interval in {hourly_airflow, hourly_cron}:
176186
prefix += f"/day={logical_date.day:02}"
177187

188+
# Further partition by reingestion date if supplied
189+
if reingestion_date is not None:
190+
prefix += f"/reingestion={reingestion_date}"
191+
178192
return prefix

0 commit comments

Comments
 (0)