Skip to content

Commit

Permalink
Add DAG to remove Flickr thumbnails (#2302)
Browse files Browse the repository at this point in the history
Co-authored-by: Staci Mullins <63313398+stacimc@users.noreply.github.com>
  • Loading branch information
krysal and stacimc authored Jun 6, 2023
1 parent 67a9dd6 commit 7804de6
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 6 deletions.
13 changes: 13 additions & 0 deletions catalog/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The following are DAGs grouped by their primary tag:
1. [Database](#database)
1. [Maintenance](#maintenance)
1. [Oauth](#oauth)
1. [Other](#other)
1. [Provider](#provider)
1. [Provider Reingestion](#provider-reingestion)

Expand Down Expand Up @@ -63,6 +64,12 @@ The following are DAGs grouped by their primary tag:
| [`oauth2_authorization`](#oauth2_authorization) | `None` |
| [`oauth2_token_refresh`](#oauth2_token_refresh) | `0 */12 * * *` |

## Other

| DAG ID | Schedule Interval |
| --------------------------------------------------------- | ----------------- |
| [`flickr_thumbnails_removal`](#flickr_thumbnails_removal) | `None` |

## Provider

| DAG ID | Schedule Interval | Dated | Media Type(s) |
Expand Down Expand Up @@ -113,6 +120,7 @@ The following is documentation associated with each DAG (where available):
1. [`finnish_museums_workflow`](#finnish_museums_workflow)
1. [`flickr_audit_sub_provider_workflow`](#flickr_audit_sub_provider_workflow)
1. [`flickr_reingestion_workflow`](#flickr_reingestion_workflow)
1. [`flickr_thumbnails_removal`](#flickr_thumbnails_removal)
1. [`flickr_workflow`](#flickr_workflow)
1. [`freesound_workflow`](#freesound_workflow)
1. [`image_data_refresh`](#image_data_refresh)
Expand Down Expand Up @@ -394,6 +402,11 @@ Output: TSV file containing the images and the respective meta-data.

Notes: https://www.flickr.com/help/terms/api Rate limit: 3600 requests per hour.

## `flickr_thumbnails_removal`

One-time run DAG to remove progressively all the old Flickr thumbnails, as they
were determined to be unsuitable for the Openverse UI requirements.

## `flickr_workflow`

Content Provider: Flickr
Expand Down
80 changes: 80 additions & 0 deletions catalog/dags/flickr_thumbs_removal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
One-time run DAG to remove progressively all the old Flickr thumbnails,
as they were determined to be unsuitable for the Openverse UI requirements.
"""
import logging
from datetime import timedelta
from textwrap import dedent

from airflow.decorators import dag, task

from common.constants import DAG_DEFAULT_ARGS, POSTGRES_CONN_ID
from common.slack import send_message
from common.sql import PostgresHook


logger = logging.getLogger(__name__)


DAG_ID = "flickr_thumbnails_removal"


@dag(
dag_id=DAG_ID,
default_args={
**DAG_DEFAULT_ARGS,
"retries": 0,
"execution_timeout": timedelta(days=7),
},
schedule=None,
catchup=False,
doc_md=__doc__,
)
def flickr_thumbnails_removal():
pg = PostgresHook(postgres_conn_id=POSTGRES_CONN_ID)
select_conditions = "FROM image WHERE provider = 'flickr' AND thumbnail IS NOT NULL"

@task()
def count():
num_thumbs = pg.get_first(f"SELECT COUNT(*) {select_conditions}")[0]
logger.info(f"Flickr thumbnails found: {num_thumbs}.")

return num_thumbs

@task()
def delete(num_thumbs):
log_sql = True
if num_thumbs == 0:
logger.info("No Flickr thumbnails found.")

while num_thumbs > 0:
query = dedent(
f"""
UPDATE image SET thumbnail = NULL WHERE identifier IN (
SELECT identifier {select_conditions}
FETCH FIRST 10000 ROWS ONLY FOR UPDATE SKIP LOCKED
)
"""
)
pg.run(query, log_sql=log_sql)
num_thumbs -= 10000
logger.info(
f"Flickr thumbnails left: {num_thumbs if num_thumbs > 0 else 0}."
)
log_sql = False

@task()
def report():
msg = (
"All Flickr thumbnails were successfully removed. "
f"The `{DAG_ID}` DAG can be retired."
)
send_message(msg, DAG_ID)

num_thumbs = count()
d = delete(num_thumbs)
r = report()
d >> r


flickr_thumbnails_removal()
8 changes: 4 additions & 4 deletions catalog/dags/providers/provider_api_scripts/flickr.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ def ingest_records(self, **kwargs):
for start_ts, end_ts in self.large_batches:
# For each large batch, ingest records for that interval one license
# type at a time.
for license in LICENSE_INFO.keys():
for license_ in LICENSE_INFO.keys():
super().ingest_records_for_timestamp_pair(
start_ts=start_ts, end_ts=end_ts, license=license
start_ts=start_ts, end_ts=end_ts, license=license_
)
logger.info("Completed large batch processing by license type.")

Expand All @@ -139,14 +139,14 @@ def get_next_query_params(self, prev_query_params, **kwargs):

# license will be available in the params if we're dealing
# with a large batch. If not, fall back to all licenses
license = kwargs.get("license", self.default_license_param)
license_ = kwargs.get("license", self.default_license_param)

return {
"min_upload_date": start_timestamp,
"max_upload_date": end_timestamp,
"page": 0,
"api_key": self.api_key,
"license": license,
"license": license_,
"per_page": self.batch_limit,
"method": "flickr.photos.search",
"media": "photos",
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ services:
context: ./docker/upstream_db/
target: db
image: openverse-upstream_db
expose:
- "5432"
ports:
- "50255:5432"
volumes:
- catalog-postgres:/var/lib/postgresql/data
- ./sample_data:/sample_data
Expand Down

0 comments on commit 7804de6

Please sign in to comment.