diff --git a/README.md b/README.md index 4602d025..0a9e3dbb 100644 --- a/README.md +++ b/README.md @@ -205,21 +205,25 @@ If you would like to mount your own codebase to the content_harvester container export MOUNT_CODEBASE= ``` -In order to run the indexer code, make sure the following variables are set: +In order to run the indexer code against an AWS hosted OpenSearch, make sure the following variables are set in startup.sh: ``` export OPENSEARCH_ENDPOINT= # ask for endpoint url -``` - -Also make sure to set your temporary AWS credentials and the region so that the mwaa-local-runner container can authenticate when talking to the OpenSearch API: - -``` export AWS_ACCESS_KEY_ID= export AWS_SECRET_ACCESS_KEY= export AWS_SESSION_TOKEN= export AWS_REGION=us-west-2 ``` +If using a local Docker container to run a dev OpenSearch, set the following variables in startup.sh: + +``` +export OPENSEARCH_ENDPOINT=https://host.docker.internal:9200/ +export OPENSEARCH_USER=admin +export OPENSEARCH_PASS="Rikolti_05" +export OPENSEARCH_IGNORE_TLS=True +``` + Finally, from inside the aws-mwaa-local-runner repo, run `./mwaa-local-env build-image` to build the docker image, and `./mwaa-local-env start` to start the mwaa local environment. For more information on `mwaa-local-env`, look for instructions in the [ucldc/aws-mwaa-local-runner:README](https://github.com/ucldc/aws-mwaa-local-runner/#readme) to build the docker image, run the container, and do local development. diff --git a/dags/dev_dags/index_to_stage_dag.py b/dags/dev_dags/index_to_stage_dag.py index a0995095..a4707ac2 100644 --- a/dags/dev_dags/index_to_stage_dag.py +++ b/dags/dev_dags/index_to_stage_dag.py @@ -1,25 +1,11 @@ from datetime import datetime -from airflow.decorators import dag, task +from airflow.decorators import dag from airflow.models.param import Param -from rikolti.dags.shared_tasks.indexing_tasks import update_stage_index_for_collection_task +from rikolti.dags.shared_tasks.indexing_tasks import ( + stage_collection_task, get_version_pages) from rikolti.dags.shared_tasks.shared import get_registry_data_task -from rikolti.utils.versions import get_merged_pages, get_with_content_urls_pages - - -@task(task_id="get_version_pages") -def get_version_pages(params=None): - if not params or not params.get('version'): - raise ValueError("Version path not found in params") - version = params.get('version') - - if 'merged' in version: - version_pages = get_merged_pages(version) - else: - version_pages = get_with_content_urls_pages(version) - - return version_pages @dag( @@ -36,6 +22,6 @@ def get_version_pages(params=None): def index_collection_to_stage_dag(): collection = get_registry_data_task() version_pages = get_version_pages() - index_name = update_stage_index_for_collection_task(collection, version_pages) # noqa F841 + index_name = stage_collection_task(collection, version_pages) # noqa F841 index_collection_to_stage_dag() \ No newline at end of file diff --git a/dags/harvest_dag.py b/dags/harvest_dag.py index 9d572fa0..4231ec9b 100644 --- a/dags/harvest_dag.py +++ b/dags/harvest_dag.py @@ -18,7 +18,7 @@ get_child_directories, get_with_content_urls_pages, get_with_content_urls_page_content, get_child_pages, create_merged_version, put_merged_page) -from rikolti.dags.shared_tasks.indexing_tasks import update_stage_index_for_collection_task +from rikolti.dags.shared_tasks.indexing_tasks import stage_collection_task def get_child_records(version, parent_id) -> list: @@ -101,7 +101,7 @@ def harvest(): collection, mapped_page_batches) merged_pages = merge_any_child_records_task(with_content_urls_version) merged_pages.set_upstream(content_harvest_task) - update_stage_index_for_collection_task(collection, merged_pages) + stage_collection_task(collection, merged_pages) harvest() \ No newline at end of file diff --git a/dags/publish_dag.py b/dags/publish_dag.py new file mode 100644 index 00000000..2cfa6dcb --- /dev/null +++ b/dags/publish_dag.py @@ -0,0 +1,31 @@ +from datetime import datetime + +from airflow.decorators import dag +from airflow.models.param import Param + +from rikolti.dags.shared_tasks.indexing_tasks import ( + publish_collection_task, get_version_pages) +from rikolti.dags.shared_tasks.shared import get_registry_data_task +from rikolti.dags.shared_tasks.shared import notify_dag_success +from rikolti.dags.shared_tasks.shared import notify_dag_failure + + +@dag( + dag_id="publish_collection", + schedule=None, + start_date=datetime(2023, 1, 1), + catchup=False, + params={ + 'collection_id': Param(None, description="Collection ID to publish"), + 'version': Param(None, description="Version path to publish") + }, + tags=["rikolti"], + on_failure_callback=notify_dag_failure, + on_success_callback=notify_dag_success, +) +def publish_collection_dag(): + collection = get_registry_data_task() + version_pages = get_version_pages() + publish_collection_task(collection, version_pages) + +publish_collection_dag() \ No newline at end of file diff --git a/dags/shared_tasks/indexing_tasks.py b/dags/shared_tasks/indexing_tasks.py index 19c937cb..675ae8bd 100644 --- a/dags/shared_tasks/indexing_tasks.py +++ b/dags/shared_tasks/indexing_tasks.py @@ -5,20 +5,18 @@ from rikolti.dags.shared_tasks.shared import notify_rikolti_failure from rikolti.dags.shared_tasks.shared import send_event_to_sns -from rikolti.record_indexer.update_stage_index import update_stage_index_for_collection -from rikolti.utils.versions import get_version - -@task(task_id="create_stage_index", on_failure_callback=notify_rikolti_failure) -def update_stage_index_for_collection_task( - collection: dict, version_pages: list[str], **context): +from rikolti.record_indexer.index_collection import index_collection +from rikolti.utils.versions import ( + get_version, get_merged_pages, get_with_content_urls_pages) +def index_collection_task(alias, collection, version_pages, context): collection_id = collection.get('id') if not collection_id: raise ValueError( f"Collection ID not found in collection metadata: {collection}") try: - update_stage_index_for_collection(collection_id, version_pages) + index_collection(alias, collection_id, version_pages) except Exception as e: # TODO: implement some rollback exception handling? raise e @@ -27,12 +25,71 @@ def update_stage_index_for_collection_task( dashboard_query = {"query": { "bool": {"filter": {"terms": {"collection_url": [collection_id]}}} }} + hr = f"\n{'-'*40}\n" + end = f"\n{'~'*40}\n" + s3_url = ( + "https://rikolti-data.s3.us-west-2.amazonaws.com/index.html#" + f"{version}/data/" + ) + opensearch_url = ( + f"{os.environ.get('OPENSEARCH_ENDPOINT', '').rstrip('/')}/" + "_dashboards/app/dev_tools#/console" + ) + calisphere_url = f"/collections/{collection_id}/" + print(alias) + if alias == 'rikolti-prd': + calisphere_url = f"https://calisphere.org{calisphere_url}" + else: + calisphere_url = f"https://calisphere-stage.cdlib.org{calisphere_url}" + print( - f"\n\nReview indexed records at: https://rikolti-data.s3.us-west-2." - f"amazonaws.com/index.html#{version.rstrip('/')}/data/ \n\n" - f"Or on opensearch at: {os.environ.get('OPENSEARCH_ENDPOINT')}" - "/_dashboards/app/dev_tools#/console with query:\n" - f"{json.dumps(dashboard_query, indent=2)}\n\n\n" + f"{hr}Review indexed records at:\n {s3_url}\n\n" + f"On opensearch at:\n {opensearch_url}\nwith query:\n" + f"GET /{alias}/_search\n" + f"{json.dumps(dashboard_query, indent=2)}\n\n" + f"On calisphere at:\n {calisphere_url}\n{end}" ) + verbed = "published" if alias == 'rikolti-prd' else "staged" + print( + f"{hr}Successfully {verbed} version:\n {version}\nto the " + f"`{alias}` index{end}" + ) + + send_event_to_sns(context, { + 'record_indexer_success': 'success', + 'version': version, + 'index': alias + }) + +@task(task_id="get_version_pages") +def get_version_pages(params=None): + if not params or not params.get('version'): + raise ValueError("Version path not found in params") + version = params.get('version') + + if 'merged' in version: + version_pages = get_merged_pages(version) + else: + version_pages = get_with_content_urls_pages(version) + + return version_pages + + +@task( + task_id="create_stage_index", + on_failure_callback=notify_rikolti_failure, + pool="rikolti_opensearch_pool") +def stage_collection_task( + collection: dict, version_pages: list[str], **context): + + index_collection_task("rikolti-stg", collection, version_pages, context) + + +@task( + task_id="publish_collection", + on_failure_callback=notify_rikolti_failure, + pool="rikolti_opensearch_pool") +def publish_collection_task( + collection: dict, version_pages: list[str], **context): - send_event_to_sns(context, {'record_indexer_success': 'success'}) + index_collection_task("rikolti-prd", collection, version_pages, context) \ No newline at end of file diff --git a/record_indexer/README.md b/record_indexer/README.md index 14a62c8b..4b5d3101 100644 --- a/record_indexer/README.md +++ b/record_indexer/README.md @@ -6,9 +6,13 @@ Records must adhere strictly to the fields specified [our index template](index_ Our `record_indexer` component is designed to remove any fields that are not in our index template. The `record_indexer` indexes records by collection into indicies identified by aliases. -## Configuring the Record Indexer - AWS and Docker Options +## Configuring the Record Indexer -The Record Indexer indexes records by hitting the configured `OPENSEARCH_ENDPOINT` - the API endpoint for an opensearch instance. Rikolti supports authenticating against an AWS hosted OpenSearch endpoint (via IAM permissioning and/or `AWS_*` environment variables) or using basic auth against a dev OpenSearch Docker container +The Record Indexer indexes records by hitting the configured `OPENSEARCH_ENDPOINT` - the API endpoint for an opensearch instance. + +Rikolti requires that all requests made to the OpenSearch API via Airflow be pooled in order to throttle concurrent reindexing requests. All Airflow tasks making requests to the OpenSearch index have `pool="rikolti_opensearch_pool"`. These tasks will not run in your Airflow instance until you create the pool using the Airflow UI, by navigating to Admin > Pools, then creating a new pool called `rikolti_opensearch_pool` with at least 1 Slot. + +Rikolti supports authenticating against an AWS hosted OpenSearch endpoint (via IAM permissioning and/or `AWS_*` environment variables) or using basic auth against a dev OpenSearch Docker container. ### AWS Hosted OpenSearch If you're trying to set up the record_indexer to communicate with an AWS hosted OpenSearch instance, set the `OPENSEARCH_ENDPOINT` to the AWS-provided endpoint. Make sure your machine or your AWS account has access, and, if relevant, set the following environment variables: `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN`, `AWS_REGION`. @@ -46,6 +50,8 @@ export OPENSEARCH_ENDPOINT=https://localhost:9200/ ## Initializing an OpenSearch instance to work with Rikolti +These initialization instructions must be run for either a new AWS Hosted OpenSearch domain, or a new local Docker Container. + Make sure that OPENSEARCH_ENDPOINT and the relevant authentication is set in your environment. 1. Create an index template for rikolti: @@ -56,13 +62,15 @@ python -m record_indexer.index_templates.rikolti_template This creates a record template that will be used for adding documents to any index with name matching `rikolti*` is added to the cluster. -2. Create an index and add it to the alias `rikolti-stg` +2. Create an initial stage and prod indexes and add the aliases `rikolti-stg` and `rikolti-prd` to each respectively: ``` -python -m record_indexer.initialize.add_rikolti-stg_index +python -m record_indexer.initialize.indices_and_aliases ``` -This creates an empty index named `rikolti-` (enforcing the use of the rikolti_template for all records indexed into it) and assigns it to the alias `rikolti-stg`. +This creates 2 empty indexes named `rikolti-` (enforcing the use of the rikolti_template for all records indexed into it) and assigns it to the alias `rikolti-stg` or `rikolti-prd`, respectively. + +You can provide your own index name by using `-s`, `--stg-name` or `-p`, `--prd-name` options. The script will prefix whatever name you provide with `rikolti-`. ## Running the Record Indexer diff --git a/record_indexer/docker-compose.yml b/record_indexer/docker-compose.yml index b4492e67..64933fcb 100644 --- a/record_indexer/docker-compose.yml +++ b/record_indexer/docker-compose.yml @@ -1,9 +1,68 @@ +--- +version: '3' services: - opensearch-node: + opensearch-node1: image: opensearchproject/opensearch:latest + container_name: opensearch-node1 + environment: + - cluster.name=opensearch-cluster + - node.name=opensearch-node1 + - discovery.seed_hosts=opensearch-node1,opensearch-node2 + - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2 + - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping + - OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m # minimum and maximum Java heap size, recommend setting both to 50% of system RAM + - OPENSEARCH_INITIAL_ADMIN_PASSWORD=Rikolti_05 # Sets the demo admin user password when using demo configuration, required for OpenSearch 2.12 and higher + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems + hard: 65536 + volumes: + - opensearch-data1:/usr/share/opensearch/data ports: - - "9200:9200" - - "9600:9600" + - 9200:9200 + - 9600:9600 # required for Performance Analyzer + networks: + - opensearch-net + opensearch-node2: + image: opensearchproject/opensearch:latest + container_name: opensearch-node2 environment: - - "discovery.type=single-node" + - cluster.name=opensearch-cluster + - node.name=opensearch-node2 + - discovery.seed_hosts=opensearch-node1,opensearch-node2 + - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2 + - bootstrap.memory_lock=true + - OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m - OPENSEARCH_INITIAL_ADMIN_PASSWORD=Rikolti_05 + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - opensearch-data2:/usr/share/opensearch/data + networks: + - opensearch-net + opensearch-dashboards: + image: opensearchproject/opensearch-dashboards:latest + container_name: opensearch-dashboards + ports: + - 5601:5601 + expose: + - '5601' + environment: + OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]' + networks: + - opensearch-net + +volumes: + opensearch-data1: + opensearch-data2: + +networks: + opensearch-net: diff --git a/record_indexer/index_collection.py b/record_indexer/index_collection.py new file mode 100644 index 00000000..80911554 --- /dev/null +++ b/record_indexer/index_collection.py @@ -0,0 +1,133 @@ +import json +import requests +from datetime import datetime +from typing import Any + +from .index_page import index_page +from . import settings +from .utils import print_opensearch_error +from rikolti.utils.versions import get_version + + +def index_collection(alias: str, collection_id: str, version_pages: list[str]): + ''' + find 1 index at alias and update it with records from version_pages + ''' + index = get_index_for_alias(alias) + + version_path = get_version(collection_id, version_pages[0]) + rikolti_data = { + "version_path": version_path, + "indexed_at": datetime.now().isoformat(), + } + + # add pages of records to index + for version_page in version_pages: + # index page of records - the index action creates a document if + # it doesn't exist, and replaces the document if it does + index_page(version_page, index, rikolti_data) + + # delete existing records + delete_collection_records_from_index(collection_id, index, version_path) + + +def get_index_for_alias(alias: str): + # for now, there should be only one index per alias (stage, prod) + url = f"{settings.ENDPOINT}/_alias/{alias}" + r = requests.get( + url, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if not (200 <= r.status_code <= 299): + print_opensearch_error(r, url) + r.raise_for_status() + aliased_indices = [key for key in r.json().keys()] + if len(aliased_indices) != 1: + raise ValueError( + f"Alias `{alias}` has {len(aliased_indices)} aliased indices. There should be 1.") + else: + return aliased_indices[0] + + +def get_outdated_versions(index:str, query: dict[str, Any]): + url = f"{settings.ENDPOINT}/{index}/_search" + headers = {"Content-Type": "application/json"} + + data = dict(query, **{ + "aggs": { + "version_paths": { + "terms": { + "field": "rikolti.version_path", + "size": 10 + } + } + }, + "track_total_hits": True, + "size": 0 + }) + + r = requests.post( + url=url, + data=json.dumps(data), + headers=headers, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if not (200 <= r.status_code <= 299): + print_opensearch_error(r, url) + r.raise_for_status() + + return r.json() + + +def delete_collection_records_from_index( + collection_id: str, index: str, version_path: str): + """ + Delete records from index that have the same collection_id but an outdated + version_path + """ + data = { + "query": { + "bool": { + "must": {"term": {"collection_id": collection_id}}, + "must_not": {"term": {"rikolti.version_path": version_path}}, + } + } + } + + outdated = get_outdated_versions(index, data) + num_outdated_records = outdated.get( + 'hits', {}).get('total', {}).get('value', 0) + oudated_versions = outdated.get( + 'aggregations', {}).get('version_paths', {}).get('buckets') + + if num_outdated_records > 0: + hr = "\n" + "-"*40 + "\n" + end = "\n" + "~"*40 + "\n" + message = ( + f"{hr}> Deleting {num_outdated_records} outdated record(s) from " + f"collection {collection_id} in `{index}` index.\n" + f"{'records':>8}: outdated versions\n" + ) + for v in oudated_versions: + message += (f"{v.get('doc_count'):>8}: {v.get('key')}\n") + message += f"New indexed documents have version: {version_path}{end}" + print(message) + + url = f"{settings.ENDPOINT}/{index}/_delete_by_query" + r = requests.post( + url=url, + data=json.dumps(data), + headers={"Content-Type": "application/json"}, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if not (200 <= r.status_code <= 299): + print_opensearch_error(r, url) + r.raise_for_status() + print(f"{hr}> Deletion results:\n{json.dumps(r.json(), indent=2)}{end}") + else: + print(f"No outdated records found for collection {collection_id} in `{index}` index.") + + return diff --git a/record_indexer/add_page_to_index.py b/record_indexer/index_page.py similarity index 81% rename from record_indexer/add_page_to_index.py rename to record_indexer/index_page.py index bd1686f2..f3124476 100644 --- a/record_indexer/add_page_to_index.py +++ b/record_indexer/index_page.py @@ -22,6 +22,7 @@ def bulk_add(records: list, index: str): url, headers=headers, data=data, + params={"refresh": "true"}, auth=settings.get_auth(), verify=settings.verify_certs() ) @@ -60,7 +61,7 @@ def build_bulk_request_body(records: list, index: str): for record in records: doc_id = record.get("id") - action = {"create": {"_index": index, "_id": doc_id}} + action = {"index": {"_index": index, "_id": doc_id}} body += f"{json.dumps(action)}\n{json.dumps(record)}\n" @@ -93,7 +94,7 @@ def get_expected_fields(): return expected_fields -def add_page(version_page: str, index: str): +def index_page(version_page: str, index: str, rikolti_data: dict): if 'merged' in version_page: records = get_merged_page_content(version_page) else: @@ -107,19 +108,29 @@ def add_page(version_page: str, index: str): for field in removed_fields: removed_fields_report[field].append(calisphere_id) + record['rikolti'] = dict(record.get('rikolti', {}), **rikolti_data) + record['rikolti']['page'] = version_page.split('/')[-1] bulk_add(records, index) - print( - f"added {len(records)} records to index `{index}` from " - f"page `{version_page}`" + start = "\n" + "-"*40 + "\n" + end = "\n" + "~"*40 + "\n" + + message = ( + f"{start}Indexed {len(records)} records to index `{index}` from " + f"page `{version_page}`\n" ) for field, calisphere_ids in removed_fields_report.items(): if len(calisphere_ids) != len(records): - print( - f" {len(calisphere_ids)} items had {field} " - f"removed: `{calisphere_ids}`" + message += ( + f"{' '*5}{len(calisphere_ids)} items had {field} " + f"removed: `{calisphere_ids}`\n" ) else: - print(f" all {len(records)} records had {field} field removed") + message += ( + f"{' '*5}all {len(records)} records had {field} field " + "removed\n" + ) + message += end + print(message) diff --git a/record_indexer/index_templates/record_index_config.py b/record_indexer/index_templates/record_index_config.py index 233f4810..a023f53f 100644 --- a/record_indexer/index_templates/record_index_config.py +++ b/record_indexer/index_templates/record_index_config.py @@ -68,7 +68,7 @@ "spatial": {"type": "text", "analyzer": "asciifolded_english", "fields": {"raw": {"type": "keyword"}}}, "subject": {"type": "text", "analyzer": "asciifolded_english", "fields": {"raw": {"type": "keyword"}}}, "temporal": {"type": "text", "analyzer": "asciifolded_english", "fields": {"raw": {"type": "keyword"}}}, - "type": {"type": "text", "analyzer": "asciifolded_english", "fields": {"raw": {"type": "keyword"}}}, + "type": {"type": "text", "analyzer": "asciifolded_english", "fields": {"raw": {"type": "keyword", "normalizer": "lowercase_trim"}}}, "sort_title": {"type": "keyword", "normalizer": "lowercase_trim"}, "facet_decade": {"type": "text", "analyzer": "asciifolded_english", "fields": {"raw": {"type": "keyword"}}}, @@ -96,6 +96,13 @@ "url_item": {"type": "keyword"}, "fetcher_type": {"type": "keyword"}, "mapper_type": {"type": "keyword"}, + "rikolti": { + "properties": { + "indexed_at": {"type": "date"}, + "version_path": {"type": "keyword"}, + "page": {"type": "keyword"}, + } + }, "sort_date_start": {"type": "date"}, "sort_date_end": {"type": "date"}, diff --git a/record_indexer/index_templates/rikolti_template.py b/record_indexer/index_templates/rikolti_template.py index 1bf413ed..ab67b45d 100644 --- a/record_indexer/index_templates/rikolti_template.py +++ b/record_indexer/index_templates/rikolti_template.py @@ -25,6 +25,7 @@ def main(): # child schema == record schema, except without the "children" field child_schema = copy.deepcopy(record_schema) del child_schema["children"] + del child_schema["rikolti"] # create nested alias fields del child_schema["collection_id"] diff --git a/record_indexer/initialize/add_rikolti-stg_index.py b/record_indexer/initialize/add_rikolti-stg_index.py deleted file mode 100644 index 243d51b6..00000000 --- a/record_indexer/initialize/add_rikolti-stg_index.py +++ /dev/null @@ -1,83 +0,0 @@ -import argparse -import json -import sys -from datetime import datetime - -import requests - -from .. import settings -from ..utils import print_opensearch_error - -""" - Create a rikolti index and add the rikolti-stg alias to it - https://opensearch.org/docs/2.3/opensearch/index-templates/ -""" - - -def main(name=None): - # check if index with rikolti-stg alias exists - url = f"{settings.ENDPOINT}/_alias/rikolti-stg" - r = requests.get( - url, - auth=settings.get_auth(), - verify=settings.verify_certs() - ) - if r.status_code == 200: - print("Index with alias rikolti-stg already exists; OpenSearch " - "instance ready for use") - return - - # create index - if not name: - name = datetime.now().strftime("%Y%m%d%H%M%S") - - index_name = f"rikolti-{name}" - url = f"{settings.ENDPOINT}/{index_name}" - r = requests.put( - url, - headers={"Content-Type": "application/json"}, - auth=settings.get_auth(), - verify=settings.verify_certs() - ) - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - print(r.text) - - # add alias - url = f"{settings.ENDPOINT}/_aliases" - data = { - "actions": [ - {"add": {"index": index_name, "alias": "rikolti-stg"}} - ] - } - r = requests.post( - url, - headers={"Content-Type": "application/json"}, - data=json.dumps(data), - auth=settings.get_auth(), - verify=settings.verify_certs() - ) - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - print(r.text) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description=( - "Creates an empty index at rikolti-; if no name " - "provided, uses the current timestamp. Adds the index to " - "the rikolti-stg alias." - ) - ) - parser.add_argument( - "-n", "--name", - help=( - "Optional name for the index; created index will be " - "rikolti-, if none provided, current timestamp will be used" - ) - ) - args = parser.parse_args() - sys.exit(main(args.name)) diff --git a/record_indexer/initialize/indices_and_aliases.py b/record_indexer/initialize/indices_and_aliases.py new file mode 100644 index 00000000..83a10f90 --- /dev/null +++ b/record_indexer/initialize/indices_and_aliases.py @@ -0,0 +1,116 @@ +import argparse +import json +import sys +from datetime import datetime + +import requests + +from .. import settings +from ..utils import print_opensearch_error + +""" + Create a rikolti index and add the rikolti-stg alias to it + https://opensearch.org/docs/2.3/opensearch/index-templates/ +""" + +def get_index_at_alias(alias): + url = f"{settings.ENDPOINT}/_alias/{alias}" + r = requests.get( + url, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if r.status_code != 200: + print_opensearch_error(r, url) + r.raise_for_status() + + if len(r.json().keys()) != 1: + raise ValueError(f"Multiple indices found at alias {alias}") + + return r.json() + + +def create_index(index_name): + url = f"{settings.ENDPOINT}/{index_name}" + r = requests.put( + url, + headers={"Content-Type": "application/json"}, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if not (200 <= r.status_code <= 299): + print_opensearch_error(r, url) + r.raise_for_status() + return r.json() + + +def add_alias(index_name, alias): + url = f"{settings.ENDPOINT}/_aliases" + data = { + "actions": [ + {"add": {"index": index_name, "alias": alias}} + ] + } + r = requests.post( + url, + headers={"Content-Type": "application/json"}, + data=json.dumps(data), + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if not (200 <= r.status_code <= 299): + print_opensearch_error(r, url) + r.raise_for_status() + return r.json() + + +def main(stg_name, prd_name): + try: + index = get_index_at_alias("rikolti-stg") + print(f"Index `{index}` with alias rikolti-stg already exists, " + "skipping creation") + except requests.HTTPError: + index_name = f"rikolti-{stg_name}" + resp = create_index(index_name) + print(resp) + resp = add_alias(index_name, "rikolti-stg") + print(resp) + + try: + index = get_index_at_alias("rikolti-prd") + print(f"Index `{index}` with alias rikolti-prd already exists, " + "skipping creation") + except requests.HTTPError: + index_name = f"rikolti-{prd_name}" + resp = create_index(index_name) + print(resp) + resp = add_alias(index_name, "rikolti-prd") + print(resp) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=( + "Creates an empty index at rikolti-; if no name " + "provided, uses the current timestamp. Adds the index to " + "the rikolti-stg alias." + ) + ) + parser.add_argument( + "-s", "--stg-name", + help=( + "Optional name for the rikolti-stg index; created index will be " + "rikolti-, if none provided, current timestamp will be used" + ), + default=datetime.now().strftime("%Y%m%d%H%M%S") + ) + parser.add_argument( + "-p", "--prd-name", + help=( + "Optional name for the rikolti-prd index; created index will be " + "rikolti-, if none provided, current timestamp will be used" + ), + default=datetime.now().strftime("%Y%m%d%H%M%S") + ) + args = parser.parse_args() + sys.exit(main(args.stg_name, args.prd_name)) diff --git a/record_indexer/scripts/reindex_rikolti_stg.py b/record_indexer/scripts/reindex_rikolti_stg.py index 7295bafa..ebb87312 100644 --- a/record_indexer/scripts/reindex_rikolti_stg.py +++ b/record_indexer/scripts/reindex_rikolti_stg.py @@ -30,7 +30,7 @@ def get_aliased_indexes(self, alias): return aliased_indices - def reindex(self, source_index, destination_index): + def reindex(self, source_index, destination_index, script=None): ''' reindex all records from `source_index` into `destination_index` ''' @@ -39,6 +39,8 @@ def reindex(self, source_index, destination_index): "source": {"index": source_index}, "dest": {"index": destination_index} } + if script: + data['script'] = {"source": script, "lang": "painless"} print(f"Reindexing `{source_index}` into `{destination_index}`") resp = requests.post( url, diff --git a/record_indexer/scripts/schema_migrations/0_add_rikolti_fields.py b/record_indexer/scripts/schema_migrations/0_add_rikolti_fields.py new file mode 100644 index 00000000..0d75d582 --- /dev/null +++ b/record_indexer/scripts/schema_migrations/0_add_rikolti_fields.py @@ -0,0 +1,103 @@ +import time +import sys +import json + +from datetime import datetime +from urllib.parse import urlparse + +from opensearchpy import OpenSearch, RequestsHttpConnection +from ... import settings + +def get_client(): + if not settings.ENDPOINT: + raise ValueError("Missing OPENSEARCH_ENDPOINT in environment") + + host_url = urlparse(settings.ENDPOINT) + + client = OpenSearch( + hosts=[{ + 'host': host_url.hostname, + 'port': host_url.port or 443, + }], + http_auth=settings.get_auth(), + use_ssl=True, + verify_certs=settings.verify_certs(), + ssl_show_warn=settings.verify_certs(), + connection_class=RequestsHttpConnection, + ) + return client + + +def main(): + alias = "rikolti-stg" + os_client = get_client() + + # get name of index currently aliased to rikolti-stg + source_index = os_client.indices.get_alias(alias).keys() + if len(source_index) > 1: + raise ValueError(f"Alias `{alias}` has {len(source_index)} aliased " + "indices. There should be 1.") + source_index = list(source_index)[0] + + # create new index name + version = datetime.today().strftime("%Y%m%d%H%M%S") + destination_index = f"{alias}-{version}" + + # create migration script + indexed_at = datetime.now().isoformat() + + field = "rikolti" + initial_value = {"rikolti_value": { + "version_path": "initial", + "indexed_at": indexed_at, + "page": "unknown" + }} + script = { + "source": f"ctx._source.{field} = params.rikolti_value", + "params": initial_value, + "lang": "painless" + } + + # reindex data from source to destination index + task = os_client.reindex( + body={ + "source": {"index": source_index}, + "dest": {"index": destination_index}, + "script": script + }, + params={ + "wait_for_completion": "false", + "error_trace": "true" + } + ) + task_id = task.get('task') + print(f"{task_id=}") + + # poll task API until reindexing is complete + task_state = os_client.tasks.get(task_id) + while not task_state.get('completed'): + time.sleep(5) + task_state = os_client.tasks.get(task_id) + + print("Reindexing complete") + if "error" not in task_state: + print("Reindexing successful") + removal = os_client.indices.delete_alias(source_index, alias) + print(f"{removal=}") + addition = os_client.indices.put_alias(destination_index, alias) + print(f"{addition=}") + print( + f"'{alias}' alias removed from {source_index} and " + f"added to {destination_index}" + ) + print( + f"Please verify {destination_index} looks good and " + f"then delete {source_index}" + ) + else: + print("Reindexing failed") + print(json.dumps(task_state)) + +if __name__ == "__main__": + main() + sys.exit() \ No newline at end of file diff --git a/record_indexer/settings.py b/record_indexer/settings.py index 95842fd9..64c677d0 100644 --- a/record_indexer/settings.py +++ b/record_indexer/settings.py @@ -1,4 +1,6 @@ import os +import urllib3 +from urllib3.exceptions import InsecureRequestWarning from boto3 import Session from dotenv import load_dotenv @@ -10,7 +12,10 @@ es_pass = os.environ.get("OPENSEARCH_PASS") def verify_certs(): - return not os.environ.get("OPENSEARCH_IGNORE_TLS", False) + ignore_tls = os.environ.get("OPENSEARCH_IGNORE_TLS", False) + if ignore_tls: + urllib3.disable_warnings(InsecureRequestWarning) + return not ignore_tls def get_auth(): if es_user and es_pass: @@ -22,7 +27,6 @@ def get_auth(): return AWSV4SignerAuth( credentials, os.environ.get("AWS_REGION", "us-west-2")) - -ENDPOINT = os.environ.get("OPENSEARCH_ENDPOINT", False) +ENDPOINT = os.environ.get("OPENSEARCH_ENDPOINT") if ENDPOINT: ENDPOINT = ENDPOINT.rstrip("/") diff --git a/record_indexer/update_stage_index.py b/record_indexer/update_stage_index.py deleted file mode 100644 index ff68e974..00000000 --- a/record_indexer/update_stage_index.py +++ /dev/null @@ -1,59 +0,0 @@ -import json -import requests - -from .add_page_to_index import add_page -from . import settings -from .utils import print_opensearch_error - -def update_stage_index_for_collection(collection_id: str, version_pages: list[str]): - ''' update stage index with a new set of collection records ''' - index = get_index_for_alias("rikolti-stg") - - # delete existing records - delete_collection_records_from_index(collection_id, index) - - # add pages of records to index - for version_page in version_pages: - add_page(version_page, index) - -def get_index_for_alias(alias: str): - # for now, there should be only one index per alias (stage, prod) - url = f"{settings.ENDPOINT}/_alias/{alias}" - r = requests.get( - url, - auth=settings.get_auth(), - verify=settings.verify_certs() - ) - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - aliased_indices = [key for key in r.json().keys()] - if len(aliased_indices) != 1: - raise ValueError( - f"Alias `{alias}` has {len(aliased_indices)} aliased indices. There should be 1.") - else: - return aliased_indices[0] - -def delete_collection_records_from_index(collection_id: str, index: str): - url = f"{settings.ENDPOINT}/{index}/_delete_by_query" - headers = {"Content-Type": "application/json"} - - data = { - "query": { - "term": { - "collection_id": collection_id - } - } - } - - r = requests.post( - url=url, - data=json.dumps(data), - headers=headers, - auth=settings.get_auth(), - verify=settings.verify_certs() - ) - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - print(f"deleted records with collection_id `{collection_id}` from index `{index}`")