Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Index version path #1012

Merged
merged 15 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,21 +205,25 @@ If you would like to mount your own codebase to the content_harvester container
export MOUNT_CODEBASE=<path to rikolti, for example: /Users/awieliczka/Projects/rikolti>
```

In order to run the indexer code, make sure the following variables are set:
In order to run the indexer code against an AWS hosted OpenSearch, make sure the following variables are set in startup.sh:

```
export OPENSEARCH_ENDPOINT= # ask for endpoint url
```

Also make sure to set your temporary AWS credentials and the region so that the mwaa-local-runner container can authenticate when talking to the OpenSearch API:

```
export AWS_ACCESS_KEY_ID=
export AWS_SECRET_ACCESS_KEY=
export AWS_SESSION_TOKEN=
export AWS_REGION=us-west-2
```

If using a local Docker container to run a dev OpenSearch, set the following variables in startup.sh:

```
export OPENSEARCH_ENDPOINT=https://host.docker.internal:9200/
export OPENSEARCH_USER=admin
export OPENSEARCH_PASS="Rikolti_05"
export OPENSEARCH_IGNORE_TLS=True
```

Finally, from inside the aws-mwaa-local-runner repo, run `./mwaa-local-env build-image` to build the docker image, and `./mwaa-local-env start` to start the mwaa local environment.

For more information on `mwaa-local-env`, look for instructions in the [ucldc/aws-mwaa-local-runner:README](https://github.com/ucldc/aws-mwaa-local-runner/#readme) to build the docker image, run the container, and do local development.
Expand Down
22 changes: 4 additions & 18 deletions dags/dev_dags/index_to_stage_dag.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,11 @@
from datetime import datetime

from airflow.decorators import dag, task
from airflow.decorators import dag
from airflow.models.param import Param

from rikolti.dags.shared_tasks.indexing_tasks import update_stage_index_for_collection_task
from rikolti.dags.shared_tasks.indexing_tasks import (
stage_collection_task, get_version_pages)
from rikolti.dags.shared_tasks.shared import get_registry_data_task
from rikolti.utils.versions import get_merged_pages, get_with_content_urls_pages


@task(task_id="get_version_pages")
def get_version_pages(params=None):
if not params or not params.get('version'):
raise ValueError("Version path not found in params")
version = params.get('version')

if 'merged' in version:
version_pages = get_merged_pages(version)
else:
version_pages = get_with_content_urls_pages(version)

return version_pages


@dag(
Expand All @@ -36,6 +22,6 @@ def get_version_pages(params=None):
def index_collection_to_stage_dag():
collection = get_registry_data_task()
version_pages = get_version_pages()
index_name = update_stage_index_for_collection_task(collection, version_pages) # noqa F841
index_name = stage_collection_task(collection, version_pages) # noqa F841

index_collection_to_stage_dag()
4 changes: 2 additions & 2 deletions dags/harvest_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
get_child_directories, get_with_content_urls_pages,
get_with_content_urls_page_content, get_child_pages,
create_merged_version, put_merged_page)
from rikolti.dags.shared_tasks.indexing_tasks import update_stage_index_for_collection_task
from rikolti.dags.shared_tasks.indexing_tasks import stage_collection_task


def get_child_records(version, parent_id) -> list:
Expand Down Expand Up @@ -101,7 +101,7 @@ def harvest():
collection, mapped_page_batches)
merged_pages = merge_any_child_records_task(with_content_urls_version)
merged_pages.set_upstream(content_harvest_task)
update_stage_index_for_collection_task(collection, merged_pages)
stage_collection_task(collection, merged_pages)


harvest()
31 changes: 31 additions & 0 deletions dags/publish_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import datetime

from airflow.decorators import dag
from airflow.models.param import Param

from rikolti.dags.shared_tasks.indexing_tasks import (
publish_collection_task, get_version_pages)
from rikolti.dags.shared_tasks.shared import get_registry_data_task
from rikolti.dags.shared_tasks.shared import notify_dag_success
from rikolti.dags.shared_tasks.shared import notify_dag_failure


@dag(
dag_id="publish_collection",
schedule=None,
start_date=datetime(2023, 1, 1),
catchup=False,
params={
'collection_id': Param(None, description="Collection ID to publish"),
'version': Param(None, description="Version path to publish")
},
tags=["rikolti"],
on_failure_callback=notify_dag_failure,
on_success_callback=notify_dag_success,
)
def publish_collection_dag():
collection = get_registry_data_task()
version_pages = get_version_pages()
publish_collection_task(collection, version_pages)

publish_collection_dag()
83 changes: 70 additions & 13 deletions dags/shared_tasks/indexing_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,18 @@

from rikolti.dags.shared_tasks.shared import notify_rikolti_failure
from rikolti.dags.shared_tasks.shared import send_event_to_sns
from rikolti.record_indexer.update_stage_index import update_stage_index_for_collection
from rikolti.utils.versions import get_version

@task(task_id="create_stage_index", on_failure_callback=notify_rikolti_failure)
def update_stage_index_for_collection_task(
collection: dict, version_pages: list[str], **context):
from rikolti.record_indexer.index_collection import index_collection
from rikolti.utils.versions import (
get_version, get_merged_pages, get_with_content_urls_pages)

def index_collection_task(alias, collection, version_pages, context):
collection_id = collection.get('id')
if not collection_id:
raise ValueError(
f"Collection ID not found in collection metadata: {collection}")

try:
update_stage_index_for_collection(collection_id, version_pages)
index_collection(alias, collection_id, version_pages)
except Exception as e:
# TODO: implement some rollback exception handling?
raise e
Expand All @@ -27,12 +25,71 @@ def update_stage_index_for_collection_task(
dashboard_query = {"query": {
"bool": {"filter": {"terms": {"collection_url": [collection_id]}}}
}}
hr = f"\n{'-'*40}\n"
end = f"\n{'~'*40}\n"
s3_url = (
"https://rikolti-data.s3.us-west-2.amazonaws.com/index.html#"
f"{version}/data/"
)
opensearch_url = (
f"{os.environ.get('OPENSEARCH_ENDPOINT', '').rstrip('/')}/"
"_dashboards/app/dev_tools#/console"
)
calisphere_url = f"/collections/{collection_id}/"
print(alias)
if alias == 'rikolti-prd':
calisphere_url = f"https://calisphere.org{calisphere_url}"
else:
calisphere_url = f"https://calisphere-stage.cdlib.org{calisphere_url}"

print(
f"\n\nReview indexed records at: https://rikolti-data.s3.us-west-2."
f"amazonaws.com/index.html#{version.rstrip('/')}/data/ \n\n"
f"Or on opensearch at: {os.environ.get('OPENSEARCH_ENDPOINT')}"
"/_dashboards/app/dev_tools#/console with query:\n"
f"{json.dumps(dashboard_query, indent=2)}\n\n\n"
f"{hr}Review indexed records at:\n {s3_url}\n\n"
f"On opensearch at:\n {opensearch_url}\nwith query:\n"
f"GET /{alias}/_search\n"
f"{json.dumps(dashboard_query, indent=2)}\n\n"
f"On calisphere at:\n {calisphere_url}\n{end}"
)
verbed = "published" if alias == 'rikolti-prd' else "staged"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You verbed the noun verb!

print(
f"{hr}Successfully {verbed} version:\n {version}\nto the "
f"`{alias}` index{end}"
)

send_event_to_sns(context, {
'record_indexer_success': 'success',
'version': version,
'index': alias
})

@task(task_id="get_version_pages")
def get_version_pages(params=None):
if not params or not params.get('version'):
raise ValueError("Version path not found in params")
version = params.get('version')

if 'merged' in version:
version_pages = get_merged_pages(version)
else:
version_pages = get_with_content_urls_pages(version)

return version_pages


@task(
task_id="create_stage_index",
on_failure_callback=notify_rikolti_failure,
pool="rikolti_opensearch_pool")
def stage_collection_task(
collection: dict, version_pages: list[str], **context):

index_collection_task("rikolti-stg", collection, version_pages, context)


@task(
task_id="publish_collection",
on_failure_callback=notify_rikolti_failure,
pool="rikolti_opensearch_pool")
def publish_collection_task(
collection: dict, version_pages: list[str], **context):

send_event_to_sns(context, {'record_indexer_success': 'success'})
index_collection_task("rikolti-prd", collection, version_pages, context)
18 changes: 13 additions & 5 deletions record_indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ Records must adhere strictly to the fields specified [our index template](index_

Our `record_indexer` component is designed to remove any fields that are not in our index template. The `record_indexer` indexes records by collection into indicies identified by aliases.

## Configuring the Record Indexer - AWS and Docker Options
## Configuring the Record Indexer

The Record Indexer indexes records by hitting the configured `OPENSEARCH_ENDPOINT` - the API endpoint for an opensearch instance. Rikolti supports authenticating against an AWS hosted OpenSearch endpoint (via IAM permissioning and/or `AWS_*` environment variables) or using basic auth against a dev OpenSearch Docker container
The Record Indexer indexes records by hitting the configured `OPENSEARCH_ENDPOINT` - the API endpoint for an opensearch instance.

Rikolti requires that all requests made to the OpenSearch API via Airflow be pooled in order to throttle concurrent reindexing requests. All Airflow tasks making requests to the OpenSearch index have `pool="rikolti_opensearch_pool"`. These tasks will not run in your Airflow instance until you create the pool using the Airflow UI, by navigating to Admin > Pools, then creating a new pool called `rikolti_opensearch_pool` with at least 1 Slot.

Rikolti supports authenticating against an AWS hosted OpenSearch endpoint (via IAM permissioning and/or `AWS_*` environment variables) or using basic auth against a dev OpenSearch Docker container.

### AWS Hosted OpenSearch
If you're trying to set up the record_indexer to communicate with an AWS hosted OpenSearch instance, set the `OPENSEARCH_ENDPOINT` to the AWS-provided endpoint. Make sure your machine or your AWS account has access, and, if relevant, set the following environment variables: `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN`, `AWS_REGION`.
Expand Down Expand Up @@ -46,6 +50,8 @@ export OPENSEARCH_ENDPOINT=https://localhost:9200/

## Initializing an OpenSearch instance to work with Rikolti

These initialization instructions must be run for either a new AWS Hosted OpenSearch domain, or a new local Docker Container.

Make sure that OPENSEARCH_ENDPOINT and the relevant authentication is set in your environment.

1. Create an index template for rikolti:
Expand All @@ -56,13 +62,15 @@ python -m record_indexer.index_templates.rikolti_template

This creates a record template that will be used for adding documents to any index with name matching `rikolti*` is added to the cluster.

2. Create an index and add it to the alias `rikolti-stg`
2. Create an initial stage and prod indexes and add the aliases `rikolti-stg` and `rikolti-prd` to each respectively:

```
python -m record_indexer.initialize.add_rikolti-stg_index
python -m record_indexer.initialize.indices_and_aliases
```

This creates an empty index named `rikolti-<current timestamp>` (enforcing the use of the rikolti_template for all records indexed into it) and assigns it to the alias `rikolti-stg`.
This creates 2 empty indexes named `rikolti-<current timestamp>` (enforcing the use of the rikolti_template for all records indexed into it) and assigns it to the alias `rikolti-stg` or `rikolti-prd`, respectively.

You can provide your own index name by using `-s`, `--stg-name` or `-p`, `--prd-name` options. The script will prefix whatever name you provide with `rikolti-`.

## Running the Record Indexer

Expand Down
67 changes: 63 additions & 4 deletions record_indexer/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,68 @@
---
version: '3'
services:
opensearch-node:
opensearch-node1:
image: opensearchproject/opensearch:latest
container_name: opensearch-node1
environment:
- cluster.name=opensearch-cluster
- node.name=opensearch-node1
- discovery.seed_hosts=opensearch-node1,opensearch-node2
- cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
- bootstrap.memory_lock=true # along with the memlock settings below, disables swapping
- OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m # minimum and maximum Java heap size, recommend setting both to 50% of system RAM
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=Rikolti_05 # Sets the demo admin user password when using demo configuration, required for OpenSearch 2.12 and higher
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems
hard: 65536
volumes:
- opensearch-data1:/usr/share/opensearch/data
ports:
- "9200:9200"
- "9600:9600"
- 9200:9200
- 9600:9600 # required for Performance Analyzer
networks:
- opensearch-net
opensearch-node2:
image: opensearchproject/opensearch:latest
container_name: opensearch-node2
environment:
- "discovery.type=single-node"
- cluster.name=opensearch-cluster
- node.name=opensearch-node2
- discovery.seed_hosts=opensearch-node1,opensearch-node2
- cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2
- bootstrap.memory_lock=true
- OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m
- OPENSEARCH_INITIAL_ADMIN_PASSWORD=Rikolti_05
ulimits:
memlock:
soft: -1
hard: -1
nofile:
soft: 65536
hard: 65536
volumes:
- opensearch-data2:/usr/share/opensearch/data
networks:
- opensearch-net
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:latest
container_name: opensearch-dashboards
ports:
- 5601:5601
expose:
- '5601'
environment:
OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]'
networks:
- opensearch-net

volumes:
opensearch-data1:
opensearch-data2:

networks:
opensearch-net:
Loading
Loading