Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opensearch Dev Environment #1010

Merged
merged 8 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,7 @@ export MOUNT_CODEBASE=<path to rikolti, for example: /Users/awieliczka/Projects/
In order to run the indexer code, make sure the following variables are set:

```
export RIKOLTI_ES_ENDPOINT= # ask for endpoint url
export INDEX_RETENTION=0
export OPENSEARCH_ENDPOINT= # ask for endpoint url
```

Also make sure to set your temporary AWS credentials and the region so that the mwaa-local-runner container can authenticate when talking to the OpenSearch API:
Expand Down
21 changes: 0 additions & 21 deletions dags/dev_dags/index_to_prod_dag.py

This file was deleted.

4 changes: 2 additions & 2 deletions dags/dev_dags/index_to_stage_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from airflow.decorators import dag, task
from airflow.models.param import Param

from rikolti.dags.shared_tasks.indexing_tasks import create_stage_index_task
from rikolti.dags.shared_tasks.indexing_tasks import update_stage_index_for_collection_task
from rikolti.dags.shared_tasks.shared import get_registry_data_task
from rikolti.utils.versions import get_merged_pages, get_with_content_urls_pages

Expand Down Expand Up @@ -36,6 +36,6 @@ def get_version_pages(params=None):
def index_collection_to_stage_dag():
collection = get_registry_data_task()
version_pages = get_version_pages()
index_name = create_stage_index_task(collection, version_pages) # noqa F841
index_name = update_stage_index_for_collection_task(collection, version_pages) # noqa F841

index_collection_to_stage_dag()
47 changes: 1 addition & 46 deletions dags/shared_tasks/indexing_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,9 @@

from rikolti.dags.shared_tasks.shared import notify_rikolti_failure
from rikolti.dags.shared_tasks.shared import send_event_to_sns
from rikolti.record_indexer.create_collection_index import create_index_name
from rikolti.record_indexer.create_collection_index import create_new_index
from rikolti.record_indexer.create_collection_index import delete_index
from rikolti.record_indexer.move_index_to_prod import move_index_to_prod
from rikolti.record_indexer.update_stage_index import update_stage_index_for_collection
from rikolti.utils.versions import get_version

@task(task_id="create_stage_index", on_failure_callback=notify_rikolti_failure)
def create_stage_index_task(
collection: dict, version_pages: list[str], **context):
collection_id = collection.get('id')
if not collection_id:
raise ValueError(
f"Collection ID not found in collection metadata: {collection}")

index_name = create_index_name(collection_id)
try:
create_new_index(collection_id, version_pages, index_name)
except Exception as e:
delete_index(index_name)
raise e

version = get_version(collection_id, version_pages[0])
dashboard_query = {"query": {
"bool": {"filter": {"terms": {"collection_url": [collection_id]}}}
}}
print(
f"\n\nReview indexed records at: https://rikolti-data.s3.us-west-2."
f"amazonaws.com/index.html#{version.rstrip('/')}/data/ \n\n"
f"Or on opensearch at: {os.environ.get('RIKOLTI_ES_ENDPOINT')}"
"/_dashboards/app/dev_tools#/console with query:\n"
f"{json.dumps(dashboard_query, indent=2)}\n\n\n"
)

send_event_to_sns(context, {'index_name': index_name})
return index_name

# this task has the same task_id as create_stage_index_task() so that we don't
# break the Airflow history with the transition between using that task
# and this one as the last step in the harvest_dag
@task(task_id="create_stage_index", on_failure_callback=notify_rikolti_failure)
def update_stage_index_for_collection_task(
collection: dict, version_pages: list[str], **context):
Expand All @@ -67,17 +30,9 @@ def update_stage_index_for_collection_task(
print(
f"\n\nReview indexed records at: https://rikolti-data.s3.us-west-2."
f"amazonaws.com/index.html#{version.rstrip('/')}/data/ \n\n"
f"Or on opensearch at: {os.environ.get('RIKOLTI_ES_ENDPOINT')}"
f"Or on opensearch at: {os.environ.get('OPENSEARCH_ENDPOINT')}"
"/_dashboards/app/dev_tools#/console with query:\n"
f"{json.dumps(dashboard_query, indent=2)}\n\n\n"
)

send_event_to_sns(context, {'record_indexer_success': 'success'})

@task(task_id="move_index_to_prod")
def move_index_to_prod_task(collection: dict):
collection_id = collection.get('id')
if not collection_id:
raise ValueError(
f"Collection ID not found in collection metadata: {collection}")
move_index_to_prod(collection_id)
4 changes: 1 addition & 3 deletions env.example
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ export NUXEO_PASS=
export CONTENT_ROOT=file:///usr/local/airflow/rikolti_content

# indexer
export RIKOLTI_ES_ENDPOINT= # ask for endpoint url
export OPENSEARCH_ENDPOINT= # ask for endpoint url
export RIKOLTI_HOME=/usr/local/airflow/dags/rikolti
export INDEX_RETENTION=0 # number of unaliased indices to retain during cleanup
export RIKOLTI_ES_ALIAS_STAGE=rikolti-stg # stage index alias

# indexer when run locally via aws-mwaa-local-runner
# export AWS_ACCESS_KEY_ID=
Expand Down
66 changes: 56 additions & 10 deletions record_indexer/README.md
Original file line number Diff line number Diff line change
@@ -1,35 +1,81 @@
## Create OpenSearch index template
# Indexing

To create the [index template](https://www.elastic.co/guide/en/elasticsearch/reference/7.9/index-templates.html) for rikolti:
We push all records that have been run through the Rikolti harvesting pipeline into an OpenSearch index.

Make sure that RIKOLTI_ES_ENDPOINT is set in your environment.
Records must adhere strictly to the fields specified [our index template](index_templates/record_index_config.py). Please review [documentation from opensearch on index templates](https://opensearch.org/docs/latest/im-plugin/index-templates/) for more information on index templates.

Our `record_indexer` component is designed to remove any fields that are not in our index template. The `record_indexer` indexes records by collection into indicies identified by aliases.

## Configuring the Record Indexer - AWS and Docker Options

The Record Indexer indexes records by hitting the configured `OPENSEARCH_ENDPOINT` - the API endpoint for an opensearch instance. Rikolti supports authenticating against an AWS hosted OpenSearch endpoint (via IAM permissioning and/or `AWS_*` environment variables) or using basic auth against a dev OpenSearch Docker container

### AWS Hosted OpenSearch
If you're trying to set up the record_indexer to communicate with an AWS hosted OpenSearch instance, set the `OPENSEARCH_ENDPOINT` to the AWS-provided endpoint. Make sure your machine or your AWS account has access, and, if relevant, set the following environment variables: `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN`, `AWS_REGION`.

### Dev OpenSearch Docker Container
There is also an OpenSearch dev environment docker compose file available. You can run `docker-compose up` to startup an OpenSearch instance with API access at https://localhost:9200. The default username is `admin` and the default password is `Rikolti_05`. [OpenSearch Docker Container Documentation](https://hub.docker.com/r/opensearchproject/opensearch)

Send requests to the OpenSearch REST API to verify the docker container is working.

> By default, OpenSearch uses self-signed TLS certificates. The -k short option skips the certificate verification step so requests don't fail

```
python -m record_indexer.index_templates.rikolti_template
curl -X GET "https://localhost:9200/_cat/indices" -ku admin:Rikolti_05
```

To use this docker container with the record indexer, you will have to configure:

```
export OPENSEARCH_USER=admin
export OPENSEARCH_PASS=Rikolti_05
export OPENSEARCH_IGNORE_TLS=True
```

**To connect to this OpenSearch docker container from mwaa-local-runner, set the previous values and the below endpoint in dags/startup.sh:**

```
export OPENSEARCH_ENDPOINT=https://host.docker.internal:9200/
```

**To use this OpenSearch docker container from your host machine, set the previous values and the below endpoint in env.local:**

```
export OPENSEARCH_ENDPOINT=https://localhost:9200/
```

This creates a template that will be used whenever an index with name matching `rikolti*` is added to the cluster.
## Initializing an OpenSearch instance to work with Rikolti

## Run indexer from command line
Make sure that OPENSEARCH_ENDPOINT and the relevant authentication is set in your environment.

Create a new index for a collection and add it to the `rikolti-stg` alias:
1. Create an index template for rikolti:

```
python -m record_indexer.create_collection_index <collection_id>
python -m record_indexer.index_templates.rikolti_template
```

Add the current stage index for a collection to the `rikolti-prd` alias:
This creates a record template that will be used for adding documents to any index with name matching `rikolti*` is added to the cluster.

2. Create an index and add it to the alias `rikolti-stg`

```
python -m record_indexer.move_index_to_prod <collection_id>
python -m record_indexer.initialize.add_rikolti-stg_index
```

This creates an empty index named `rikolti-<current timestamp>` (enforcing the use of the rikolti_template for all records indexed into it) and assigns it to the alias `rikolti-stg`.

## Running the Record Indexer

TODO: We don't currently support running the indexer from the command line

## Indexer development using aws-mwaa-local-runner

See the Rikolti README page section on [Airflow Development](https://github.com/ucldc/rikolti/#airflow-development). In particular, make sure that indexer-related env vars are set as described there.

## Index lifecycle

TODO: this section is all pretty defunct now; update documentation

The lifecycle of an index is as follows:

#### Create new index
Expand Down
8 changes: 7 additions & 1 deletion record_indexer/add_page_to_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ def bulk_add(records: list, index: str):

headers = {"Content-Type": "application/json"}

r = requests.post(url, headers=headers, data=data, auth=settings.get_auth())
r = requests.post(
url,
headers=headers,
data=data,
auth=settings.get_auth(),
verify=settings.verify_certs()
)
if not (200 <= r.status_code <= 299):
print_opensearch_error(r, url)
r.raise_for_status()
Expand Down
128 changes: 0 additions & 128 deletions record_indexer/create_collection_index.py

This file was deleted.

9 changes: 9 additions & 0 deletions record_indexer/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
services:
opensearch-node:
image: opensearchproject/opensearch:latest
ports:
- "9200:9200"
- "9600:9600"
environment:
- "discovery.type=single-node"
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=Rikolti_05
1 change: 1 addition & 0 deletions record_indexer/index_templates/rikolti_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def main():
headers={"Content-Type": "application/json"},
data=json.dumps(RECORD_INDEX_CONFIG),
auth=settings.get_auth(),
verify=settings.verify_certs()
)
if not (200 <= r.status_code <= 299):
print_opensearch_error(r, url)
Expand Down
Loading
Loading