From d84dddea9dc49e404a6c9539fc4294569e9dc291 Mon Sep 17 00:00:00 2001 From: Madison Swain-Bowden Date: Thu, 20 Jun 2024 15:08:02 -0700 Subject: [PATCH] Move footnote content into the primary document --- ...mentation_plan_ingestion_server_removal.md | 98 ++++++++++--------- 1 file changed, 52 insertions(+), 46 deletions(-) diff --git a/documentation/projects/proposals/ingestion_server_removal/20240328-implementation_plan_ingestion_server_removal.md b/documentation/projects/proposals/ingestion_server_removal/20240328-implementation_plan_ingestion_server_removal.md index d109f61c009..cd1543b74da 100644 --- a/documentation/projects/proposals/ingestion_server_removal/20240328-implementation_plan_ingestion_server_removal.md +++ b/documentation/projects/proposals/ingestion_server_removal/20240328-implementation_plan_ingestion_server_removal.md @@ -347,7 +347,24 @@ server's work: - `Filter Data`: initially, this should be a single python task which exactly mirrors the behavior of the [`clean_image_data` function of `cleanup.py`](https://github.com/WordPress/openverse/blob/47fe5df0e9b8ad3dba06021f4cd4af9139977644/ingestion_server/ingestion_server/cleanup.py#L295) - (only applying the tag-specific steps) on the ingestion server[^3]. + (only applying the tag-specific steps) on the ingestion server[^3]. The + easiest way to do this would be to directly map the functionality of the + ingestion server on this step within a single Airflow task. The steps for this + task are as follows (see [Alternatives](#filtering-approach) for possible + future directions): + 1. Get a batch of records from the database using `CLEANUP_BUFFER_SIZE` + 2. Divide batch up into `multiprocessing.cpu_count()` subbatches + 3. Split the filtering up into separate workers using multiprocessing + 4. On each process + 1. Create a new DB connection & cursor per worker + 2. Iterate through each record + 1. Remove tags below confidence level + 2. Remove tags that need to be filtered (denylisted, machine-generated + filter list, provider, etc) + 3. Only surface the record if it needs to be changed + 4. Update each records one by one with a single `UPDATE` + 3. Commit cursor and close connection + 5. Repeat steps 1-4 until all batches are consumed - `Create Index`: we can use our existing [Elasticsearch tasks](https://github.com/WordPress/openverse/blob/05ff48d05f2163104151c5589cf352a156bc6a97/catalog/dags/common/elasticsearch.py#L82) to create the new elasticsearch index with the index suffix generated in the @@ -565,6 +582,8 @@ No new tools or packages are required. +### ECS approach + The alternative options of using an ECS approach or performing the reindex entirely in Airflow are discussed at length in the [Approach to the Distributed Reindex](#approach-to-the-distributed-reindex) @@ -575,6 +594,37 @@ using EC2 operators to start and stop the instances as needed. However, more infrastructure work is required in this approach, and we would require deployments whenever there are code changes in the indexer workers. +### Filtering approach + +There are a number of ways to accomplish the data filtering, including several +ways to improve the approach mentioned. + +The Airflow scheduler container has access to 4 cores, which is the same as the +ingestion server where this step was originally running. At present, it takes +about 8 hours for all cleanup steps, but that includes the URL cleaning which is +certainly more time intensive than the tag filtering since it makes outbound +requests. Running the tag filtering on Airflow should not impact any of the +other running tasks or saturate the instance. + +There are a few ways this process could be improved, but none of them are +required _at this moment_. We can follow up after this project is complete to +assess what further optimizations might be necessary at this step. Some +potential suggestions for that time: + +- Instead of single `UPDATE` queries for each affected records, we could insert + records from each subbatch to a temporary table. Then the base table could be + updated with an `UPDATE ... FROM` in bulk. Since the indices haven't been + applied to the base table yet, this should be fairly performant. +- Instead of using multiprocessing, we could pre-define the batches and run the + filtering chunks on a set of mapped tasks. The multiprocessing has the benefit + of iterating over a cursor on the database rather than having to manage the + record ranges explicitly, but this would allow further parallelization and + task management. +- The indexer workers themselves could be expanded to run on larger chunks of + the database for this filtering. This would likely require the most work as it + would involve expanding the indexer workers' API to handle this additional + task. + ## Blockers @@ -621,48 +671,4 @@ monitor the first production data refreshes closely. [^3]: See #4456 for further context on this. The filtering is a _necessary_ step of the data refresh we need to carry forward even after removing the other - cleanup steps. There are a number of ways we can accomplish this, but by far - the easiest would be to directly map the functionality of the ingestion - server on this step within a single Airflow task. The steps for this task - are as follows: - - 1. Get a batch of records from the database using `CLEANUP_BUFFER_SIZE` - 2. Divide batch up into `multiprocessing.cpu_count()` subbatches - 3. Split the filtering up into separate workers using multiprocessing - 4. On each process - 1. Create a new DB connection & cursor per worker - 2. Iterate through each record - 1. Remove tags below confidence level - 2. Remove tags that need to be filtered (denylisted, machine-generated - filter list, provider, etc) - 3. Only surface the record if it needs to be changed - 4. Update each records one by one with a single `UPDATE` - 3. Commit cursor and close connection - 5. Repeat steps 1-4 until all batches are consumed - - The Airflow scheduler container has access to 4 cores, which is the same as - the ingestion server where this step was originally running. At present it - takes about 8 hours for all cleanup steps, but that includes the URL - cleaning which is certainly more time intensive than the tag filtering since - it makes outbound requests. Running the tag filtering on Airflow should not - impact any of the other running tasks or saturate the instance. - - There are a few ways this process could be improved, but none of them are - required _at this moment_. We can follow up after this project is complete - to assess what further optimizations might be necessary at this step. Some - potential suggestions for that time: - - - Instead of single `UPDATE` queries for each affected records, we could - insert records from each subbatch to a temporary table. Then the base - table could be updated with an `UPDATE ... FROM` in bulk. Since the - indices haven't been applied to the base table yet, this should be fairly - performant. - - Instead of using multiprocessing, we could pre-define the batches and run - the filtering chunks on a set of mapped tasks. The multiprocessing has the - benefit of iterating over a cursor on the database rather than having to - manage the record ranges explicitly, but this would allow further - parallelization and task management. - - The indexer workers themselves could be expanded to run on larger chunks - of the database for this filtering. This would likely require the most - work as it would involve expanding the indexer workers' API to handle this - additional task. + cleanup steps.