Skip to content

Commit

Permalink
Move footnote content into the primary document
Browse files Browse the repository at this point in the history
  • Loading branch information
AetherUnbound committed Jun 20, 2024
1 parent e6b0dc8 commit d84ddde
Showing 1 changed file with 52 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,24 @@ server's work:
- `Filter Data`: initially, this should be a single python task which exactly
mirrors the behavior of the
[`clean_image_data` function of `cleanup.py`](https://github.com/WordPress/openverse/blob/47fe5df0e9b8ad3dba06021f4cd4af9139977644/ingestion_server/ingestion_server/cleanup.py#L295)
(only applying the tag-specific steps) on the ingestion server[^3].
(only applying the tag-specific steps) on the ingestion server[^3]. The
easiest way to do this would be to directly map the functionality of the
ingestion server on this step within a single Airflow task. The steps for this
task are as follows (see [Alternatives](#filtering-approach) for possible
future directions):
1. Get a batch of records from the database using `CLEANUP_BUFFER_SIZE`
2. Divide batch up into `multiprocessing.cpu_count()` subbatches
3. Split the filtering up into separate workers using multiprocessing
4. On each process
1. Create a new DB connection & cursor per worker
2. Iterate through each record
1. Remove tags below confidence level
2. Remove tags that need to be filtered (denylisted, machine-generated
filter list, provider, etc)
3. Only surface the record if it needs to be changed
4. Update each records one by one with a single `UPDATE`
3. Commit cursor and close connection
5. Repeat steps 1-4 until all batches are consumed
- `Create Index`: we can use our existing
[Elasticsearch tasks](https://github.com/WordPress/openverse/blob/05ff48d05f2163104151c5589cf352a156bc6a97/catalog/dags/common/elasticsearch.py#L82)
to create the new elasticsearch index with the index suffix generated in the
Expand Down Expand Up @@ -565,6 +582,8 @@ No new tools or packages are required.

<!-- Describe any alternatives considered and why they were not chosen or recommended. -->

### ECS approach

The alternative options of using an ECS approach or performing the reindex
entirely in Airflow are discussed at length in the
[Approach to the Distributed Reindex](#approach-to-the-distributed-reindex)
Expand All @@ -575,6 +594,37 @@ using EC2 operators to start and stop the instances as needed. However, more
infrastructure work is required in this approach, and we would require
deployments whenever there are code changes in the indexer workers.

### Filtering approach

There are a number of ways to accomplish the data filtering, including several
ways to improve the approach mentioned.

The Airflow scheduler container has access to 4 cores, which is the same as the
ingestion server where this step was originally running. At present, it takes
about 8 hours for all cleanup steps, but that includes the URL cleaning which is
certainly more time intensive than the tag filtering since it makes outbound
requests. Running the tag filtering on Airflow should not impact any of the
other running tasks or saturate the instance.

There are a few ways this process could be improved, but none of them are
required _at this moment_. We can follow up after this project is complete to
assess what further optimizations might be necessary at this step. Some
potential suggestions for that time:

- Instead of single `UPDATE` queries for each affected records, we could insert
records from each subbatch to a temporary table. Then the base table could be
updated with an `UPDATE ... FROM` in bulk. Since the indices haven't been
applied to the base table yet, this should be fairly performant.
- Instead of using multiprocessing, we could pre-define the batches and run the
filtering chunks on a set of mapped tasks. The multiprocessing has the benefit
of iterating over a cursor on the database rather than having to manage the
record ranges explicitly, but this would allow further parallelization and
task management.
- The indexer workers themselves could be expanded to run on larger chunks of
the database for this filtering. This would likely require the most work as it
would involve expanding the indexer workers' API to handle this additional
task.

## Blockers

<!-- What hard blockers exist that prevent further work on this project? -->
Expand Down Expand Up @@ -621,48 +671,4 @@ monitor the first production data refreshes closely.
[^3]:
See #4456 for further context on this. The filtering is a _necessary_ step
of the data refresh we need to carry forward even after removing the other
cleanup steps. There are a number of ways we can accomplish this, but by far
the easiest would be to directly map the functionality of the ingestion
server on this step within a single Airflow task. The steps for this task
are as follows:

1. Get a batch of records from the database using `CLEANUP_BUFFER_SIZE`
2. Divide batch up into `multiprocessing.cpu_count()` subbatches
3. Split the filtering up into separate workers using multiprocessing
4. On each process
1. Create a new DB connection & cursor per worker
2. Iterate through each record
1. Remove tags below confidence level
2. Remove tags that need to be filtered (denylisted, machine-generated
filter list, provider, etc)
3. Only surface the record if it needs to be changed
4. Update each records one by one with a single `UPDATE`
3. Commit cursor and close connection
5. Repeat steps 1-4 until all batches are consumed

The Airflow scheduler container has access to 4 cores, which is the same as
the ingestion server where this step was originally running. At present it
takes about 8 hours for all cleanup steps, but that includes the URL
cleaning which is certainly more time intensive than the tag filtering since
it makes outbound requests. Running the tag filtering on Airflow should not
impact any of the other running tasks or saturate the instance.

There are a few ways this process could be improved, but none of them are
required _at this moment_. We can follow up after this project is complete
to assess what further optimizations might be necessary at this step. Some
potential suggestions for that time:

- Instead of single `UPDATE` queries for each affected records, we could
insert records from each subbatch to a temporary table. Then the base
table could be updated with an `UPDATE ... FROM` in bulk. Since the
indices haven't been applied to the base table yet, this should be fairly
performant.
- Instead of using multiprocessing, we could pre-define the batches and run
the filtering chunks on a set of mapped tasks. The multiprocessing has the
benefit of iterating over a cursor on the database rather than having to
manage the record ranges explicitly, but this would allow further
parallelization and task management.
- The indexer workers themselves could be expanded to run on larger chunks
of the database for this filtering. This would likely require the most
work as it would involve expanding the indexer workers' API to handle this
additional task.
cleanup steps.

0 comments on commit d84ddde

Please sign in to comment.