Skip to content

Commit

Permalink
Update ingestion server removal IP with EC2 approach (#4615)
Browse files Browse the repository at this point in the history
* Update documentation with EC2 approach

* Remove outdated/incorrect AMI reference

* Fix formatting to show that terminating instance is done per instance, not in one step

* Fix formatting

Co-authored-by: Madison Swain-Bowden <bowdenm@spu.edu>

---------

Co-authored-by: Madison Swain-Bowden <bowdenm@spu.edu>
  • Loading branch information
stacimc and AetherUnbound authored Jul 19, 2024
1 parent c0b9087 commit 6198601
Showing 1 changed file with 51 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,9 @@ Two other options were evaluated for this IP:

- Keep the EC2 instances as they are now, but connect to them directly from
Airflow. Rather than managing the 8 total EC2 instances directly, we will
instead set up an EC2 Auto Scaling group for each environment, with an initial
desired capacity of 0. The data refresh DAGs will spin up instances by
increasing this desired capacity. (More details follow later in this
document.)
instead set up an EC2 launch template for each environment. The data refresh
DAGs will spin up instances using the appropriate launch template and existing
EC2 operators. (More details follow later in this document.)
- Remove the EC2 instances entirely in favor of a new ECS task definition using
the `indexer-worker` image, which would remove all API code and contain only
the reindexing script. Airflow would spin up the appropriate number of ECS
Expand Down Expand Up @@ -227,14 +226,11 @@ configuration we can ensure that the latest indexer-worker image is pulled each
time, which means that any changes to the reindexing code will become live in
production as soon as the new docker image is published after merging on `main`.

The EC2 approach uses an ASG to achieve a similar result. Because the ASG will
The EC2 approach uses EC2 operators to achieve a similar result. Because we will
actually terminate (rather than stop) the instances when their work is complete
and start _new_ instances for each data refresh, we can pull the Docker image
with the `latest` tag in the `user_data` script so that the latest Docker image
is pulled each time a new data refresh starts. By using AWS Systems Manager
parameters instead of AMI IDs in the launch template, we can even use the ASG to
automatically use new AMI IDs without needing to deploy a new launch template
each time a system dependency is updated.
and create entirely _new_ instances for each data refresh, we can update the
launch template to use the latest Docker image each time an instance is created,
so that the latest code is pulled each time a data refresh starts.

### Conclusion

Expand Down Expand Up @@ -277,8 +273,8 @@ developed as separate DAGs alongside the current ones.
indexer worker image locally.
1. Add the distributed reindexing step to the DAG (excludes infrastructure
work).
1. Set up the necessary resources for the ASGs for staging and production in the
catalog Terraform configuration.
1. Set up the necessary resources for the EC2 launch templates for staging and
production in the catalog Terraform configuration.
1. Add all remaining steps to the data refresh DAGs:
`Create and Populated Filtered Index`, `Reapply Constraints`,
`Promote Table`, `Promote Index`, `Delete Index`.
Expand Down Expand Up @@ -373,8 +369,8 @@ server's work:
### Implement new catalog indexer worker

In this step we will create the new catalog-indexer-worker Docker image. This
step does not include adding the orchestration steps to the DAG, or the
infrastructure work to actually create the ASGs.
step does not include adding the orchestration steps to the DAG, or the related
infrastructure work.

First we will create a new `indexer-worker` directory under
`catalog/dags/data_refresh`, which will contain the contents of the new indexer
Expand Down Expand Up @@ -426,58 +422,47 @@ refreshes locally and in production).

In this step we will add tasks to the data refresh DAGs to orchestrate the
distributed reindex. At the end of this step, it will be possible to run a
distributed reindex _locally_, but because the infrastructure work to create the
ASGs is not complete, it can not be run on production yet. The following code
can all be refactored from
distributed reindex _locally_. The following code can all be refactored from
[`distributed_reindex_scheduler.py`](https://github.com/WordPress/openverse/blob/main/ingestion_server/ingestion_server/distributed_reindex_scheduler.py).

- Use
[`describe_auto_scaling_groups`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/autoscaling/client/describe_auto_scaling_groups.html)
and filter by tags to select the appropriate ASG for the desired environment.
(Skips in local env.)
- Use
[`set_desired_capacity`](https://boto3.amazonaws.com/v1/documentation/api/1.26.86/reference/services/autoscaling/client/set_desired_capacity.html)
to increase the desired capacity of the ASG to the desired number of workers,
depending on the environment. This will cause the ASG to begin spinning up
instances. (Skips in local env.)
- Use
[`describe_auto_scaling_groups`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/autoscaling/client/describe_auto_scaling_groups.html)
to poll the ASG until all instances have been started, and get the EC2
instance IDs. (Skips in local env.)
- Use dynamic task mapping to distribute reindexing across the indexer workers
by first calculating `start` and `end` indices that will split the records in
the media table into even portions, depending on the number of workers
available in the given environment. Then:
- POST to each worker's `reindexing_task` endpoint the `start_index` and
`end_index` it should handle
available in the given target environment. Then, for each worker:
- Use the
['EC2CreateInstanceOperator'](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/ec2/index.html#airflow.providers.amazon.aws.operators.ec2.EC2CreateInstanceOperator)
to create a new EC2 instance. The task returns the id of the created
instance.
- Under the covers this operator is using boto3
[`run_instances`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ec2/client/run_instances.html).
We can pass the name or id for the appropriate launch template in the
`config` parameter. The launch template names will be hard-coded
constants.
- Note: it's possible to use this operator to launch all the workers at once
by setting the `min_count` and `max_count` parameters. We will instead be
using dynamic task mapping to create a separate task for creating each
indexer worker individually: this is so that if a single worker fails
later in the process, we can retry that worker in isolation.
- Use the EC2Hook
[`describe_instances`](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/hooks/ec2/index.html#airflow.providers.amazon.aws.hooks.ec2.EC2Hook.describe_instances)
to get the private IP address of the worker instance from its instance id
- Use a Sensor to ping the worker's healthcheck endpoint, ensuring that the
instance is up and running and the API is accessible.
- POST to the worker's `reindexing_task` endpoint with the `start_index` and
`end_index` it should handle, triggering the reindexing.
- Use a Sensor to ping the worker's `task/{task_id}` endpoint until the task
is complete, logging the progress as it goes
- Use
[`terminate_instance_in_auto_scaling_group`](https://boto3.amazonaws.com/v1/documentation/api/1.26.86/reference/services/autoscaling/client/terminate_instance_in_auto_scaling_group.html)
to terminate the instance. Make sure to set `ShouldDecrementDesiredCapacity`
to `True` to ensure that the ASG does not try to replace the instance. This
task should use the
[`NONE_SKIPPED` TriggerRule](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#trigger-rules)
to ensure that the instances are terminated, even if there are upstream
failures. (Skips in local env.)
- Finally, after all tasks have finished (regardless of success/failure), we
should have a cleanup task that calls `set_desired_capacity` to 0. Generally
this should be a no-op, but if an instance crashes during reindexing (rather
than simply failing during reindexing) the ASG will spin up a replacement and
Airflow will not automatically clean it up. This task ensures that any
dangling instances are terminated.

```{note}
It is not possible to retry a single indexer worker with this set up, because once a worker fails the instance is actually terminated (rather than simply stopped). If a task that triggers a reindex is cleared after an instance has been terminated, it will simply fail. The entire reindex must be restarted from the first step in this task group.
However, there is a valuable tradoff to this approach: it ensures that all of the indexer workers in a data refresh are identical, while still allowing us to avoid manual deployments every time the indexer logic changes. For example, imagine some changes to the reindexing logic are merged to `main` while a data refresh is actively underway, and a new Docker image is published. If one indexer worker failed, and it were possible to retry **just** that indexer worker, it would use the new Docker image -- leading to inconsistency in the behavior of different workers within a single data refresh.
In future iterations this may also be solved by using AMIs with the Docker image baked in, and then preventing launch template version bumps while a data refresh is running.
```
- Finally, use the
[`EC2TerminateInstanceOperator`](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/ec2/index.html#airflow.providers.amazon.aws.operators.ec2.EC2TerminateInstanceOperator)
to terminate the instance. Make sure to use the
[`NONE_SKIPPED` TriggerRule](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#trigger-rules)
to ensure that the instances are terminated, even if there are upstream
failures. (Skips in local env.)

### Create the Terraform and Ansible resources needed to deploy the new indexer workers

In this step we will add the resources needed to actually configure the ASGs.
In this step we will add the launch templates needed to actually create the EC2
instances.

Some important notes:

Expand All @@ -486,9 +471,8 @@ Some important notes:
separately from the 6 production workers). It is more accurate to view all 8
workers as production instances (i.e., part of the production catalog
deployment), which merely _operate_ on different environments. As such all 8
should be part of the production deployment, but two separate ASGs which are
given a "staging-indexer-worker-pool" and "production-indexer-worker-pool"
tag, respectively, to indicate their intended environment.
should be part of the production deployment, but there will be two separate
launch templates which are tagged to indicate their respective environment.
- The playbooks must be updated to check if any of the four _new_ data refresh
DAGs are running before deploying, as well.
- The `user_data` script should be updated to pull the Docker image with the
Expand Down Expand Up @@ -625,6 +609,13 @@ potential suggestions for that time:
would involve expanding the indexer workers' API to handle this additional
task.

### ASG approach

An earlier draft of this implementation plan used an AutoScaling Group for each
environment. Airflow would use `set_desired_capacity` to set the ASG's capacity,
and the ASG itself would then spin up (and down) EC2 instances. A disadvantage
of this approach was that it is impossible to retry a single indexer worker.

## Blockers

<!-- What hard blockers exist that prevent further work on this project? -->
Expand Down

0 comments on commit 6198601

Please sign in to comment.