From b3641bff00b2336ccc68ba6e6b74e211d0fdb0bf Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Fri, 21 Jun 2024 14:03:33 -0700 Subject: [PATCH 01/15] Rename update_stage_index to index_collection --- dags/shared_tasks/indexing_tasks.py | 4 ++-- .../{update_stage_index.py => index_collection.py} | 9 ++++++--- record_indexer/settings.py | 3 +-- 3 files changed, 9 insertions(+), 7 deletions(-) rename record_indexer/{update_stage_index.py => index_collection.py} (88%) diff --git a/dags/shared_tasks/indexing_tasks.py b/dags/shared_tasks/indexing_tasks.py index 19c937cb..146eed88 100644 --- a/dags/shared_tasks/indexing_tasks.py +++ b/dags/shared_tasks/indexing_tasks.py @@ -5,7 +5,7 @@ from rikolti.dags.shared_tasks.shared import notify_rikolti_failure from rikolti.dags.shared_tasks.shared import send_event_to_sns -from rikolti.record_indexer.update_stage_index import update_stage_index_for_collection +from rikolti.record_indexer.index_collection import index_collection from rikolti.utils.versions import get_version @task(task_id="create_stage_index", on_failure_callback=notify_rikolti_failure) @@ -18,7 +18,7 @@ def update_stage_index_for_collection_task( f"Collection ID not found in collection metadata: {collection}") try: - update_stage_index_for_collection(collection_id, version_pages) + index_collection("rikolti-stg", collection_id, version_pages) except Exception as e: # TODO: implement some rollback exception handling? raise e diff --git a/record_indexer/update_stage_index.py b/record_indexer/index_collection.py similarity index 88% rename from record_indexer/update_stage_index.py rename to record_indexer/index_collection.py index ff68e974..c46a49da 100644 --- a/record_indexer/update_stage_index.py +++ b/record_indexer/index_collection.py @@ -5,9 +5,12 @@ from . import settings from .utils import print_opensearch_error -def update_stage_index_for_collection(collection_id: str, version_pages: list[str]): - ''' update stage index with a new set of collection records ''' - index = get_index_for_alias("rikolti-stg") + +def index_collection(alias: str, collection_id: str, version_pages: list[str]): + ''' + find 1 index at alias and update it with records from version_pages + ''' + index = get_index_for_alias(alias) # delete existing records delete_collection_records_from_index(collection_id, index) diff --git a/record_indexer/settings.py b/record_indexer/settings.py index 95842fd9..87510b50 100644 --- a/record_indexer/settings.py +++ b/record_indexer/settings.py @@ -22,7 +22,6 @@ def get_auth(): return AWSV4SignerAuth( credentials, os.environ.get("AWS_REGION", "us-west-2")) - -ENDPOINT = os.environ.get("OPENSEARCH_ENDPOINT", False) +ENDPOINT = os.environ.get("OPENSEARCH_ENDPOINT") if ENDPOINT: ENDPOINT = ENDPOINT.rstrip("/") From 23fca8adae9c1738d27ac2562c848607910a7159 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Fri, 21 Jun 2024 15:31:43 -0700 Subject: [PATCH 02/15] Indexer: Add rikolti.version_path, indexed_at, and page --- record_indexer/add_page_to_index.py | 4 ++- record_indexer/index_collection.py | 28 +++++++++++++++---- .../index_templates/record_index_config.py | 7 +++++ 3 files changed, 32 insertions(+), 7 deletions(-) diff --git a/record_indexer/add_page_to_index.py b/record_indexer/add_page_to_index.py index bd1686f2..e839b934 100644 --- a/record_indexer/add_page_to_index.py +++ b/record_indexer/add_page_to_index.py @@ -93,7 +93,7 @@ def get_expected_fields(): return expected_fields -def add_page(version_page: str, index: str): +def add_page(version_page: str, index: str, rikolti_data: dict): if 'merged' in version_page: records = get_merged_page_content(version_page) else: @@ -107,6 +107,8 @@ def add_page(version_page: str, index: str): for field in removed_fields: removed_fields_report[field].append(calisphere_id) + record['rikolti'] = dict(record.get('rikolti', {}), **rikolti_data) + record['rikolti']['page'] = version_page.split('/')[-1] bulk_add(records, index) diff --git a/record_indexer/index_collection.py b/record_indexer/index_collection.py index c46a49da..ae3b2588 100644 --- a/record_indexer/index_collection.py +++ b/record_indexer/index_collection.py @@ -1,9 +1,11 @@ import json import requests +from datetime import datetime from .add_page_to_index import add_page from . import settings from .utils import print_opensearch_error +from rikolti.utils.versions import get_version def index_collection(alias: str, collection_id: str, version_pages: list[str]): @@ -12,12 +14,19 @@ def index_collection(alias: str, collection_id: str, version_pages: list[str]): ''' index = get_index_for_alias(alias) - # delete existing records - delete_collection_records_from_index(collection_id, index) + version_path = get_version(collection_id, version_pages[0]) + rikolti_data = { + "version_path": version_path, + "indexed_at": datetime.now().isoformat(), + } # add pages of records to index for version_page in version_pages: - add_page(version_page, index) + add_page(version_page, index, rikolti_data) + + # delete existing records + delete_collection_records_from_index(collection_id, index, version_path) + def get_index_for_alias(alias: str): # for now, there should be only one index per alias (stage, prod) @@ -37,14 +46,21 @@ def get_index_for_alias(alias: str): else: return aliased_indices[0] -def delete_collection_records_from_index(collection_id: str, index: str): + +def delete_collection_records_from_index( + collection_id: str, index: str, version_path: str): + """ + Delete records from index that have the same collection_id but an outdated + version_path + """ url = f"{settings.ENDPOINT}/{index}/_delete_by_query" headers = {"Content-Type": "application/json"} data = { "query": { - "term": { - "collection_id": collection_id + "bool": { + "must": {"term": {"collection_id": collection_id}}, + "must_not": {"term": {"rikolti.version_path": version_path}}, } } } diff --git a/record_indexer/index_templates/record_index_config.py b/record_indexer/index_templates/record_index_config.py index 233f4810..98dc2d34 100644 --- a/record_indexer/index_templates/record_index_config.py +++ b/record_indexer/index_templates/record_index_config.py @@ -96,6 +96,13 @@ "url_item": {"type": "keyword"}, "fetcher_type": {"type": "keyword"}, "mapper_type": {"type": "keyword"}, + "rikolti": { + "properties": { + "indexed_at": {"type": "date"}, + "version_path": {"type": "keyword"}, + "page": {"type": "keyword"}, + } + }, "sort_date_start": {"type": "date"}, "sort_date_end": {"type": "date"}, From a70d3a33dee06e47ffb8e31fc261d9f7450a7329 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Fri, 21 Jun 2024 15:32:09 -0700 Subject: [PATCH 03/15] Write a migration script for currently indexed content --- record_indexer/scripts/reindex_rikolti_stg.py | 4 +- .../schema_migrations/0_add_rikolti_fields.py | 103 ++++++++++++++++++ 2 files changed, 106 insertions(+), 1 deletion(-) create mode 100644 record_indexer/scripts/schema_migrations/0_add_rikolti_fields.py diff --git a/record_indexer/scripts/reindex_rikolti_stg.py b/record_indexer/scripts/reindex_rikolti_stg.py index 7295bafa..ebb87312 100644 --- a/record_indexer/scripts/reindex_rikolti_stg.py +++ b/record_indexer/scripts/reindex_rikolti_stg.py @@ -30,7 +30,7 @@ def get_aliased_indexes(self, alias): return aliased_indices - def reindex(self, source_index, destination_index): + def reindex(self, source_index, destination_index, script=None): ''' reindex all records from `source_index` into `destination_index` ''' @@ -39,6 +39,8 @@ def reindex(self, source_index, destination_index): "source": {"index": source_index}, "dest": {"index": destination_index} } + if script: + data['script'] = {"source": script, "lang": "painless"} print(f"Reindexing `{source_index}` into `{destination_index}`") resp = requests.post( url, diff --git a/record_indexer/scripts/schema_migrations/0_add_rikolti_fields.py b/record_indexer/scripts/schema_migrations/0_add_rikolti_fields.py new file mode 100644 index 00000000..0d75d582 --- /dev/null +++ b/record_indexer/scripts/schema_migrations/0_add_rikolti_fields.py @@ -0,0 +1,103 @@ +import time +import sys +import json + +from datetime import datetime +from urllib.parse import urlparse + +from opensearchpy import OpenSearch, RequestsHttpConnection +from ... import settings + +def get_client(): + if not settings.ENDPOINT: + raise ValueError("Missing OPENSEARCH_ENDPOINT in environment") + + host_url = urlparse(settings.ENDPOINT) + + client = OpenSearch( + hosts=[{ + 'host': host_url.hostname, + 'port': host_url.port or 443, + }], + http_auth=settings.get_auth(), + use_ssl=True, + verify_certs=settings.verify_certs(), + ssl_show_warn=settings.verify_certs(), + connection_class=RequestsHttpConnection, + ) + return client + + +def main(): + alias = "rikolti-stg" + os_client = get_client() + + # get name of index currently aliased to rikolti-stg + source_index = os_client.indices.get_alias(alias).keys() + if len(source_index) > 1: + raise ValueError(f"Alias `{alias}` has {len(source_index)} aliased " + "indices. There should be 1.") + source_index = list(source_index)[0] + + # create new index name + version = datetime.today().strftime("%Y%m%d%H%M%S") + destination_index = f"{alias}-{version}" + + # create migration script + indexed_at = datetime.now().isoformat() + + field = "rikolti" + initial_value = {"rikolti_value": { + "version_path": "initial", + "indexed_at": indexed_at, + "page": "unknown" + }} + script = { + "source": f"ctx._source.{field} = params.rikolti_value", + "params": initial_value, + "lang": "painless" + } + + # reindex data from source to destination index + task = os_client.reindex( + body={ + "source": {"index": source_index}, + "dest": {"index": destination_index}, + "script": script + }, + params={ + "wait_for_completion": "false", + "error_trace": "true" + } + ) + task_id = task.get('task') + print(f"{task_id=}") + + # poll task API until reindexing is complete + task_state = os_client.tasks.get(task_id) + while not task_state.get('completed'): + time.sleep(5) + task_state = os_client.tasks.get(task_id) + + print("Reindexing complete") + if "error" not in task_state: + print("Reindexing successful") + removal = os_client.indices.delete_alias(source_index, alias) + print(f"{removal=}") + addition = os_client.indices.put_alias(destination_index, alias) + print(f"{addition=}") + print( + f"'{alias}' alias removed from {source_index} and " + f"added to {destination_index}" + ) + print( + f"Please verify {destination_index} looks good and " + f"then delete {source_index}" + ) + else: + print("Reindexing failed") + print(json.dumps(task_state)) + +if __name__ == "__main__": + main() + sys.exit() \ No newline at end of file From 36387ea5a57b3f88d337fd04391baac01fe68a09 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Mon, 24 Jun 2024 17:48:41 -0700 Subject: [PATCH 04/15] Add an opensearch dashboard (and required cluster) to docker-compose --- record_indexer/docker-compose.yml | 67 +++++++++++++++++++++++++++++-- record_indexer/settings.py | 7 +++- 2 files changed, 69 insertions(+), 5 deletions(-) diff --git a/record_indexer/docker-compose.yml b/record_indexer/docker-compose.yml index b4492e67..64933fcb 100644 --- a/record_indexer/docker-compose.yml +++ b/record_indexer/docker-compose.yml @@ -1,9 +1,68 @@ +--- +version: '3' services: - opensearch-node: + opensearch-node1: image: opensearchproject/opensearch:latest + container_name: opensearch-node1 + environment: + - cluster.name=opensearch-cluster + - node.name=opensearch-node1 + - discovery.seed_hosts=opensearch-node1,opensearch-node2 + - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2 + - bootstrap.memory_lock=true # along with the memlock settings below, disables swapping + - OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m # minimum and maximum Java heap size, recommend setting both to 50% of system RAM + - OPENSEARCH_INITIAL_ADMIN_PASSWORD=Rikolti_05 # Sets the demo admin user password when using demo configuration, required for OpenSearch 2.12 and higher + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 # maximum number of open files for the OpenSearch user, set to at least 65536 on modern systems + hard: 65536 + volumes: + - opensearch-data1:/usr/share/opensearch/data ports: - - "9200:9200" - - "9600:9600" + - 9200:9200 + - 9600:9600 # required for Performance Analyzer + networks: + - opensearch-net + opensearch-node2: + image: opensearchproject/opensearch:latest + container_name: opensearch-node2 environment: - - "discovery.type=single-node" + - cluster.name=opensearch-cluster + - node.name=opensearch-node2 + - discovery.seed_hosts=opensearch-node1,opensearch-node2 + - cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2 + - bootstrap.memory_lock=true + - OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m - OPENSEARCH_INITIAL_ADMIN_PASSWORD=Rikolti_05 + ulimits: + memlock: + soft: -1 + hard: -1 + nofile: + soft: 65536 + hard: 65536 + volumes: + - opensearch-data2:/usr/share/opensearch/data + networks: + - opensearch-net + opensearch-dashboards: + image: opensearchproject/opensearch-dashboards:latest + container_name: opensearch-dashboards + ports: + - 5601:5601 + expose: + - '5601' + environment: + OPENSEARCH_HOSTS: '["https://opensearch-node1:9200","https://opensearch-node2:9200"]' + networks: + - opensearch-net + +volumes: + opensearch-data1: + opensearch-data2: + +networks: + opensearch-net: diff --git a/record_indexer/settings.py b/record_indexer/settings.py index 87510b50..64c677d0 100644 --- a/record_indexer/settings.py +++ b/record_indexer/settings.py @@ -1,4 +1,6 @@ import os +import urllib3 +from urllib3.exceptions import InsecureRequestWarning from boto3 import Session from dotenv import load_dotenv @@ -10,7 +12,10 @@ es_pass = os.environ.get("OPENSEARCH_PASS") def verify_certs(): - return not os.environ.get("OPENSEARCH_IGNORE_TLS", False) + ignore_tls = os.environ.get("OPENSEARCH_IGNORE_TLS", False) + if ignore_tls: + urllib3.disable_warnings(InsecureRequestWarning) + return not ignore_tls def get_auth(): if es_user and es_pass: From 641294422899618791fd9ba21d0143af53cc52b2 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Mon, 24 Jun 2024 18:38:18 -0700 Subject: [PATCH 05/15] Rename add_page_to_index to index_page --- record_indexer/index_collection.py | 4 ++-- record_indexer/{add_page_to_index.py => index_page.py} | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename record_indexer/{add_page_to_index.py => index_page.py} (98%) diff --git a/record_indexer/index_collection.py b/record_indexer/index_collection.py index ae3b2588..6356c749 100644 --- a/record_indexer/index_collection.py +++ b/record_indexer/index_collection.py @@ -2,7 +2,7 @@ import requests from datetime import datetime -from .add_page_to_index import add_page +from .index_page import index_page from . import settings from .utils import print_opensearch_error from rikolti.utils.versions import get_version @@ -22,7 +22,7 @@ def index_collection(alias: str, collection_id: str, version_pages: list[str]): # add pages of records to index for version_page in version_pages: - add_page(version_page, index, rikolti_data) + index_page(version_page, index, rikolti_data) # delete existing records delete_collection_records_from_index(collection_id, index, version_path) diff --git a/record_indexer/add_page_to_index.py b/record_indexer/index_page.py similarity index 98% rename from record_indexer/add_page_to_index.py rename to record_indexer/index_page.py index e839b934..c5c7eb5a 100644 --- a/record_indexer/add_page_to_index.py +++ b/record_indexer/index_page.py @@ -93,7 +93,7 @@ def get_expected_fields(): return expected_fields -def add_page(version_page: str, index: str, rikolti_data: dict): +def index_page(version_page: str, index: str, rikolti_data: dict): if 'merged' in version_page: records = get_merged_page_content(version_page) else: From e566075fec35c84a05ee1195673e2f8f694405f7 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Mon, 24 Jun 2024 18:38:40 -0700 Subject: [PATCH 06/15] Don't need rikolti harvest data on child record --- record_indexer/index_templates/rikolti_template.py | 1 + 1 file changed, 1 insertion(+) diff --git a/record_indexer/index_templates/rikolti_template.py b/record_indexer/index_templates/rikolti_template.py index 1bf413ed..ab67b45d 100644 --- a/record_indexer/index_templates/rikolti_template.py +++ b/record_indexer/index_templates/rikolti_template.py @@ -25,6 +25,7 @@ def main(): # child schema == record schema, except without the "children" field child_schema = copy.deepcopy(record_schema) del child_schema["children"] + del child_schema["rikolti"] # create nested alias fields del child_schema["collection_id"] From e2dcb72ce8440903a8296b162a3f4ed5b620aa4a Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 25 Jun 2024 15:00:11 -0700 Subject: [PATCH 07/15] Update indexing/deletion process - refresh: true --- record_indexer/index_collection.py | 71 ++++++++++++++++++++++++------ record_indexer/index_page.py | 3 +- 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/record_indexer/index_collection.py b/record_indexer/index_collection.py index 6356c749..86576a09 100644 --- a/record_indexer/index_collection.py +++ b/record_indexer/index_collection.py @@ -1,6 +1,7 @@ import json import requests from datetime import datetime +from typing import Any from .index_page import index_page from . import settings @@ -22,6 +23,8 @@ def index_collection(alias: str, collection_id: str, version_pages: list[str]): # add pages of records to index for version_page in version_pages: + # index page of records - the index action creates a document if + # it doesn't exist, and replaces the document if it does index_page(version_page, index, rikolti_data) # delete existing records @@ -47,15 +50,43 @@ def get_index_for_alias(alias: str): return aliased_indices[0] +def get_outdated_versions(index:str, query: dict[str, Any]): + url = f"{settings.ENDPOINT}/{index}/_search" + headers = {"Content-Type": "application/json"} + + data = dict(query, **{ + "aggs": { + "version_paths": { + "terms": { + "field": "rikolti.version_path", + "size": 10 + } + } + }, + "track_total_hits": True, + "size": 0 + }) + + r = requests.post( + url=url, + data=json.dumps(data), + headers=headers, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if not (200 <= r.status_code <= 299): + print_opensearch_error(r, url) + r.raise_for_status() + + return r.json() + + def delete_collection_records_from_index( collection_id: str, index: str, version_path: str): """ Delete records from index that have the same collection_id but an outdated version_path """ - url = f"{settings.ENDPOINT}/{index}/_delete_by_query" - headers = {"Content-Type": "application/json"} - data = { "query": { "bool": { @@ -65,14 +96,26 @@ def delete_collection_records_from_index( } } - r = requests.post( - url=url, - data=json.dumps(data), - headers=headers, - auth=settings.get_auth(), - verify=settings.verify_certs() - ) - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - print(f"deleted records with collection_id `{collection_id}` from index `{index}`") + outdated = get_outdated_versions(index, data) + num_outdated_records = outdated.get( + 'hits', {}).get('total', {}).get('value', 0) + oudated_versions = outdated.get( + 'aggregations', {}).get('version_paths', {}).get('buckets') + + if num_outdated_records > 0: + url = f"{settings.ENDPOINT}/{index}/_delete_by_query" + r = requests.post( + url=url, + data=json.dumps(data), + headers={"Content-Type": "application/json"}, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if not (200 <= r.status_code <= 299): + print_opensearch_error(r, url) + r.raise_for_status() + print(f"deleted records with collection_id `{collection_id}` from index `{index}`") + else: + print(f"No outdated records found for collection {collection_id} in `{index}` index.") + + return diff --git a/record_indexer/index_page.py b/record_indexer/index_page.py index c5c7eb5a..b4cf9636 100644 --- a/record_indexer/index_page.py +++ b/record_indexer/index_page.py @@ -22,6 +22,7 @@ def bulk_add(records: list, index: str): url, headers=headers, data=data, + params={"refresh": "true"}, auth=settings.get_auth(), verify=settings.verify_certs() ) @@ -60,7 +61,7 @@ def build_bulk_request_body(records: list, index: str): for record in records: doc_id = record.get("id") - action = {"create": {"_index": index, "_id": doc_id}} + action = {"index": {"_index": index, "_id": doc_id}} body += f"{json.dumps(action)}\n{json.dumps(record)}\n" From 93780b514f76080b497cc166e536fb724654a082 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 25 Jun 2024 15:01:34 -0700 Subject: [PATCH 08/15] Indexing reporting output; experiment with str formatting --- dags/shared_tasks/indexing_tasks.py | 6 ++++-- record_indexer/index_collection.py | 14 +++++++++++++- record_indexer/index_page.py | 22 +++++++++++++++------- 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/dags/shared_tasks/indexing_tasks.py b/dags/shared_tasks/indexing_tasks.py index 146eed88..23820c58 100644 --- a/dags/shared_tasks/indexing_tasks.py +++ b/dags/shared_tasks/indexing_tasks.py @@ -27,12 +27,14 @@ def update_stage_index_for_collection_task( dashboard_query = {"query": { "bool": {"filter": {"terms": {"collection_url": [collection_id]}}} }} + hr = f"\n{'-'*40}\n" + end = f"\n{'~'*40}\n" print( - f"\n\nReview indexed records at: https://rikolti-data.s3.us-west-2." + f"{hr}Review indexed records at: \n https://rikolti-data.s3.us-west-2." f"amazonaws.com/index.html#{version.rstrip('/')}/data/ \n\n" f"Or on opensearch at: {os.environ.get('OPENSEARCH_ENDPOINT')}" "/_dashboards/app/dev_tools#/console with query:\n" - f"{json.dumps(dashboard_query, indent=2)}\n\n\n" + f"{json.dumps(dashboard_query, indent=2)}{end}" ) send_event_to_sns(context, {'record_indexer_success': 'success'}) diff --git a/record_indexer/index_collection.py b/record_indexer/index_collection.py index 86576a09..80911554 100644 --- a/record_indexer/index_collection.py +++ b/record_indexer/index_collection.py @@ -103,6 +103,18 @@ def delete_collection_records_from_index( 'aggregations', {}).get('version_paths', {}).get('buckets') if num_outdated_records > 0: + hr = "\n" + "-"*40 + "\n" + end = "\n" + "~"*40 + "\n" + message = ( + f"{hr}> Deleting {num_outdated_records} outdated record(s) from " + f"collection {collection_id} in `{index}` index.\n" + f"{'records':>8}: outdated versions\n" + ) + for v in oudated_versions: + message += (f"{v.get('doc_count'):>8}: {v.get('key')}\n") + message += f"New indexed documents have version: {version_path}{end}" + print(message) + url = f"{settings.ENDPOINT}/{index}/_delete_by_query" r = requests.post( url=url, @@ -114,7 +126,7 @@ def delete_collection_records_from_index( if not (200 <= r.status_code <= 299): print_opensearch_error(r, url) r.raise_for_status() - print(f"deleted records with collection_id `{collection_id}` from index `{index}`") + print(f"{hr}> Deletion results:\n{json.dumps(r.json(), indent=2)}{end}") else: print(f"No outdated records found for collection {collection_id} in `{index}` index.") diff --git a/record_indexer/index_page.py b/record_indexer/index_page.py index b4cf9636..f3124476 100644 --- a/record_indexer/index_page.py +++ b/record_indexer/index_page.py @@ -113,16 +113,24 @@ def index_page(version_page: str, index: str, rikolti_data: dict): bulk_add(records, index) - print( - f"added {len(records)} records to index `{index}` from " - f"page `{version_page}`" + start = "\n" + "-"*40 + "\n" + end = "\n" + "~"*40 + "\n" + + message = ( + f"{start}Indexed {len(records)} records to index `{index}` from " + f"page `{version_page}`\n" ) for field, calisphere_ids in removed_fields_report.items(): if len(calisphere_ids) != len(records): - print( - f" {len(calisphere_ids)} items had {field} " - f"removed: `{calisphere_ids}`" + message += ( + f"{' '*5}{len(calisphere_ids)} items had {field} " + f"removed: `{calisphere_ids}`\n" ) else: - print(f" all {len(records)} records had {field} field removed") + message += ( + f"{' '*5}all {len(records)} records had {field} field " + "removed\n" + ) + message += end + print(message) From 2f4045914a37de54c87ec19607094637f48524ad Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 25 Jun 2024 15:13:24 -0700 Subject: [PATCH 09/15] Add calisphere-stage and version name to task output --- dags/shared_tasks/indexing_tasks.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/dags/shared_tasks/indexing_tasks.py b/dags/shared_tasks/indexing_tasks.py index 23820c58..155f47e7 100644 --- a/dags/shared_tasks/indexing_tasks.py +++ b/dags/shared_tasks/indexing_tasks.py @@ -29,12 +29,26 @@ def update_stage_index_for_collection_task( }} hr = f"\n{'-'*40}\n" end = f"\n{'~'*40}\n" + s3_url = ( + "https://rikolti-data.s3.us-west-2.amazonaws.com/index.html#" + f"{version}/data/" + ) + opensearch_url = ( + f"{os.environ.get('OPENSEARCH_ENDPOINT')}/_dashboards/app/" + "dev_tools#/console" + ) + calisphere_url = ( + f"https://calisphere-stage.cdlib.org/collections/{collection_id}/" + ) + print( + f"{hr}Review indexed records at:\n {s3_url}\n\n" + f"On opensearch at:\n {opensearch_url}\nwith query:\n" + f"{json.dumps(dashboard_query, indent=2)}\n\n" + f"On calisphere-stage at:\n {calisphere_url}\n{end}" + ) print( - f"{hr}Review indexed records at: \n https://rikolti-data.s3.us-west-2." - f"amazonaws.com/index.html#{version.rstrip('/')}/data/ \n\n" - f"Or on opensearch at: {os.environ.get('OPENSEARCH_ENDPOINT')}" - "/_dashboards/app/dev_tools#/console with query:\n" - f"{json.dumps(dashboard_query, indent=2)}{end}" + f"{hr}Successfully published version:\n {version}\nto the " + f"stage index{end}" ) send_event_to_sns(context, {'record_indexer_success': 'success'}) From 83ba28fdadfb95c4f2abf6e4efc2441aa194545a Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 25 Jun 2024 15:14:18 -0700 Subject: [PATCH 10/15] Add index and version to registry message --- dags/shared_tasks/indexing_tasks.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dags/shared_tasks/indexing_tasks.py b/dags/shared_tasks/indexing_tasks.py index 155f47e7..c6d20248 100644 --- a/dags/shared_tasks/indexing_tasks.py +++ b/dags/shared_tasks/indexing_tasks.py @@ -51,4 +51,8 @@ def update_stage_index_for_collection_task( f"stage index{end}" ) - send_event_to_sns(context, {'record_indexer_success': 'success'}) + send_event_to_sns(context, { + 'record_indexer_success': 'success', + 'version': version, + 'index': 'rikolti-stg' + }) From 7fa6b204078476f11b9806c3a9c3de32e2786d9f Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 25 Jun 2024 16:00:20 -0700 Subject: [PATCH 11/15] Create publish_collection_task --- dags/dev_dags/index_to_stage_dag.py | 22 +++--------- dags/harvest_dag.py | 4 +-- dags/shared_tasks/indexing_tasks.py | 54 ++++++++++++++++++++++------- 3 files changed, 48 insertions(+), 32 deletions(-) diff --git a/dags/dev_dags/index_to_stage_dag.py b/dags/dev_dags/index_to_stage_dag.py index a0995095..a4707ac2 100644 --- a/dags/dev_dags/index_to_stage_dag.py +++ b/dags/dev_dags/index_to_stage_dag.py @@ -1,25 +1,11 @@ from datetime import datetime -from airflow.decorators import dag, task +from airflow.decorators import dag from airflow.models.param import Param -from rikolti.dags.shared_tasks.indexing_tasks import update_stage_index_for_collection_task +from rikolti.dags.shared_tasks.indexing_tasks import ( + stage_collection_task, get_version_pages) from rikolti.dags.shared_tasks.shared import get_registry_data_task -from rikolti.utils.versions import get_merged_pages, get_with_content_urls_pages - - -@task(task_id="get_version_pages") -def get_version_pages(params=None): - if not params or not params.get('version'): - raise ValueError("Version path not found in params") - version = params.get('version') - - if 'merged' in version: - version_pages = get_merged_pages(version) - else: - version_pages = get_with_content_urls_pages(version) - - return version_pages @dag( @@ -36,6 +22,6 @@ def get_version_pages(params=None): def index_collection_to_stage_dag(): collection = get_registry_data_task() version_pages = get_version_pages() - index_name = update_stage_index_for_collection_task(collection, version_pages) # noqa F841 + index_name = stage_collection_task(collection, version_pages) # noqa F841 index_collection_to_stage_dag() \ No newline at end of file diff --git a/dags/harvest_dag.py b/dags/harvest_dag.py index 9d572fa0..4231ec9b 100644 --- a/dags/harvest_dag.py +++ b/dags/harvest_dag.py @@ -18,7 +18,7 @@ get_child_directories, get_with_content_urls_pages, get_with_content_urls_page_content, get_child_pages, create_merged_version, put_merged_page) -from rikolti.dags.shared_tasks.indexing_tasks import update_stage_index_for_collection_task +from rikolti.dags.shared_tasks.indexing_tasks import stage_collection_task def get_child_records(version, parent_id) -> list: @@ -101,7 +101,7 @@ def harvest(): collection, mapped_page_batches) merged_pages = merge_any_child_records_task(with_content_urls_version) merged_pages.set_upstream(content_harvest_task) - update_stage_index_for_collection_task(collection, merged_pages) + stage_collection_task(collection, merged_pages) harvest() \ No newline at end of file diff --git a/dags/shared_tasks/indexing_tasks.py b/dags/shared_tasks/indexing_tasks.py index c6d20248..6816d0f2 100644 --- a/dags/shared_tasks/indexing_tasks.py +++ b/dags/shared_tasks/indexing_tasks.py @@ -6,19 +6,17 @@ from rikolti.dags.shared_tasks.shared import notify_rikolti_failure from rikolti.dags.shared_tasks.shared import send_event_to_sns from rikolti.record_indexer.index_collection import index_collection -from rikolti.utils.versions import get_version - -@task(task_id="create_stage_index", on_failure_callback=notify_rikolti_failure) -def update_stage_index_for_collection_task( - collection: dict, version_pages: list[str], **context): +from rikolti.utils.versions import ( + get_version, get_merged_pages, get_with_content_urls_pages) +def index_collection_task(alias, collection, version_pages, context): collection_id = collection.get('id') if not collection_id: raise ValueError( f"Collection ID not found in collection metadata: {collection}") try: - index_collection("rikolti-stg", collection_id, version_pages) + index_collection(alias, collection_id, version_pages) except Exception as e: # TODO: implement some rollback exception handling? raise e @@ -37,22 +35,54 @@ def update_stage_index_for_collection_task( f"{os.environ.get('OPENSEARCH_ENDPOINT')}/_dashboards/app/" "dev_tools#/console" ) - calisphere_url = ( - f"https://calisphere-stage.cdlib.org/collections/{collection_id}/" - ) + calisphere_url = f"/collections/{collection_id}/" + if alias == 'rikolti-prd': + calisphere_url = f"https://calisphere.org/{calisphere_url}" + else: + calisphere_url = f"https://calisphere-stage.cdlib.org/{calisphere_url}" + print( f"{hr}Review indexed records at:\n {s3_url}\n\n" f"On opensearch at:\n {opensearch_url}\nwith query:\n" + f"GET /{alias}/_search\n" f"{json.dumps(dashboard_query, indent=2)}\n\n" f"On calisphere-stage at:\n {calisphere_url}\n{end}" ) + verbed = "published" if alias == 'rikolti-prd' else "staged" print( - f"{hr}Successfully published version:\n {version}\nto the " - f"stage index{end}" + f"{hr}Successfully {verbed} version:\n {version}\nto the " + f"`{alias}` index{end}" ) send_event_to_sns(context, { 'record_indexer_success': 'success', 'version': version, - 'index': 'rikolti-stg' + 'index': alias }) + +@task(task_id="get_version_pages") +def get_version_pages(params=None): + if not params or not params.get('version'): + raise ValueError("Version path not found in params") + version = params.get('version') + + if 'merged' in version: + version_pages = get_merged_pages(version) + else: + version_pages = get_with_content_urls_pages(version) + + return version_pages + + +@task(task_id="create_stage_index", on_failure_callback=notify_rikolti_failure) +def stage_collection_task( + collection: dict, version_pages: list[str], **context): + + index_collection_task("rikolti-stg", collection, version_pages, context) + + +@task(task_id="publish_collection", on_failure_callback=notify_rikolti_failure) +def publish_collection_task( + collection: dict, version_pages: list[str], **context): + + index_collection_task("rikolti-prd", collection, version_pages, context) \ No newline at end of file From cc62ec4537f43c210b036364a81e264ec437eaf1 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 25 Jun 2024 16:09:18 -0700 Subject: [PATCH 12/15] Create Publish Collection DAG --- dags/publish_dag.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 dags/publish_dag.py diff --git a/dags/publish_dag.py b/dags/publish_dag.py new file mode 100644 index 00000000..2cfa6dcb --- /dev/null +++ b/dags/publish_dag.py @@ -0,0 +1,31 @@ +from datetime import datetime + +from airflow.decorators import dag +from airflow.models.param import Param + +from rikolti.dags.shared_tasks.indexing_tasks import ( + publish_collection_task, get_version_pages) +from rikolti.dags.shared_tasks.shared import get_registry_data_task +from rikolti.dags.shared_tasks.shared import notify_dag_success +from rikolti.dags.shared_tasks.shared import notify_dag_failure + + +@dag( + dag_id="publish_collection", + schedule=None, + start_date=datetime(2023, 1, 1), + catchup=False, + params={ + 'collection_id': Param(None, description="Collection ID to publish"), + 'version': Param(None, description="Version path to publish") + }, + tags=["rikolti"], + on_failure_callback=notify_dag_failure, + on_success_callback=notify_dag_success, +) +def publish_collection_dag(): + collection = get_registry_data_task() + version_pages = get_version_pages() + publish_collection_task(collection, version_pages) + +publish_collection_dag() \ No newline at end of file From 04133adf05807e7c8b9850181adbba1728e0f3f1 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 25 Jun 2024 17:23:37 -0700 Subject: [PATCH 13/15] Add rikolti-prd index to initialization script --- record_indexer/README.md | 4 +- .../initialize/add_rikolti-stg_index.py | 83 ------------- .../initialize/indices_and_aliases.py | 116 ++++++++++++++++++ 3 files changed, 118 insertions(+), 85 deletions(-) delete mode 100644 record_indexer/initialize/add_rikolti-stg_index.py create mode 100644 record_indexer/initialize/indices_and_aliases.py diff --git a/record_indexer/README.md b/record_indexer/README.md index 14a62c8b..861947d1 100644 --- a/record_indexer/README.md +++ b/record_indexer/README.md @@ -56,10 +56,10 @@ python -m record_indexer.index_templates.rikolti_template This creates a record template that will be used for adding documents to any index with name matching `rikolti*` is added to the cluster. -2. Create an index and add it to the alias `rikolti-stg` +2. Create an initial stage and prod indexes and add the aliases `rikolti-stg` and `rikolti-prd` to each respectively: ``` -python -m record_indexer.initialize.add_rikolti-stg_index +python -m record_indexer.initialize.indices_and_aliases ``` This creates an empty index named `rikolti-` (enforcing the use of the rikolti_template for all records indexed into it) and assigns it to the alias `rikolti-stg`. diff --git a/record_indexer/initialize/add_rikolti-stg_index.py b/record_indexer/initialize/add_rikolti-stg_index.py deleted file mode 100644 index 243d51b6..00000000 --- a/record_indexer/initialize/add_rikolti-stg_index.py +++ /dev/null @@ -1,83 +0,0 @@ -import argparse -import json -import sys -from datetime import datetime - -import requests - -from .. import settings -from ..utils import print_opensearch_error - -""" - Create a rikolti index and add the rikolti-stg alias to it - https://opensearch.org/docs/2.3/opensearch/index-templates/ -""" - - -def main(name=None): - # check if index with rikolti-stg alias exists - url = f"{settings.ENDPOINT}/_alias/rikolti-stg" - r = requests.get( - url, - auth=settings.get_auth(), - verify=settings.verify_certs() - ) - if r.status_code == 200: - print("Index with alias rikolti-stg already exists; OpenSearch " - "instance ready for use") - return - - # create index - if not name: - name = datetime.now().strftime("%Y%m%d%H%M%S") - - index_name = f"rikolti-{name}" - url = f"{settings.ENDPOINT}/{index_name}" - r = requests.put( - url, - headers={"Content-Type": "application/json"}, - auth=settings.get_auth(), - verify=settings.verify_certs() - ) - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - print(r.text) - - # add alias - url = f"{settings.ENDPOINT}/_aliases" - data = { - "actions": [ - {"add": {"index": index_name, "alias": "rikolti-stg"}} - ] - } - r = requests.post( - url, - headers={"Content-Type": "application/json"}, - data=json.dumps(data), - auth=settings.get_auth(), - verify=settings.verify_certs() - ) - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - print(r.text) - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description=( - "Creates an empty index at rikolti-; if no name " - "provided, uses the current timestamp. Adds the index to " - "the rikolti-stg alias." - ) - ) - parser.add_argument( - "-n", "--name", - help=( - "Optional name for the index; created index will be " - "rikolti-, if none provided, current timestamp will be used" - ) - ) - args = parser.parse_args() - sys.exit(main(args.name)) diff --git a/record_indexer/initialize/indices_and_aliases.py b/record_indexer/initialize/indices_and_aliases.py new file mode 100644 index 00000000..83a10f90 --- /dev/null +++ b/record_indexer/initialize/indices_and_aliases.py @@ -0,0 +1,116 @@ +import argparse +import json +import sys +from datetime import datetime + +import requests + +from .. import settings +from ..utils import print_opensearch_error + +""" + Create a rikolti index and add the rikolti-stg alias to it + https://opensearch.org/docs/2.3/opensearch/index-templates/ +""" + +def get_index_at_alias(alias): + url = f"{settings.ENDPOINT}/_alias/{alias}" + r = requests.get( + url, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if r.status_code != 200: + print_opensearch_error(r, url) + r.raise_for_status() + + if len(r.json().keys()) != 1: + raise ValueError(f"Multiple indices found at alias {alias}") + + return r.json() + + +def create_index(index_name): + url = f"{settings.ENDPOINT}/{index_name}" + r = requests.put( + url, + headers={"Content-Type": "application/json"}, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if not (200 <= r.status_code <= 299): + print_opensearch_error(r, url) + r.raise_for_status() + return r.json() + + +def add_alias(index_name, alias): + url = f"{settings.ENDPOINT}/_aliases" + data = { + "actions": [ + {"add": {"index": index_name, "alias": alias}} + ] + } + r = requests.post( + url, + headers={"Content-Type": "application/json"}, + data=json.dumps(data), + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if not (200 <= r.status_code <= 299): + print_opensearch_error(r, url) + r.raise_for_status() + return r.json() + + +def main(stg_name, prd_name): + try: + index = get_index_at_alias("rikolti-stg") + print(f"Index `{index}` with alias rikolti-stg already exists, " + "skipping creation") + except requests.HTTPError: + index_name = f"rikolti-{stg_name}" + resp = create_index(index_name) + print(resp) + resp = add_alias(index_name, "rikolti-stg") + print(resp) + + try: + index = get_index_at_alias("rikolti-prd") + print(f"Index `{index}` with alias rikolti-prd already exists, " + "skipping creation") + except requests.HTTPError: + index_name = f"rikolti-{prd_name}" + resp = create_index(index_name) + print(resp) + resp = add_alias(index_name, "rikolti-prd") + print(resp) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=( + "Creates an empty index at rikolti-; if no name " + "provided, uses the current timestamp. Adds the index to " + "the rikolti-stg alias." + ) + ) + parser.add_argument( + "-s", "--stg-name", + help=( + "Optional name for the rikolti-stg index; created index will be " + "rikolti-, if none provided, current timestamp will be used" + ), + default=datetime.now().strftime("%Y%m%d%H%M%S") + ) + parser.add_argument( + "-p", "--prd-name", + help=( + "Optional name for the rikolti-prd index; created index will be " + "rikolti-, if none provided, current timestamp will be used" + ), + default=datetime.now().strftime("%Y%m%d%H%M%S") + ) + args = parser.parse_args() + sys.exit(main(args.stg_name, args.prd_name)) From d1ab7b4bffd119feeb648274a37629e7aeaf3e87 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Thu, 27 Jun 2024 14:21:02 -0700 Subject: [PATCH 14/15] Assigned indexing tasks to rikolti_opensearch_pool & updated docs --- README.md | 16 ++++++++++------ dags/shared_tasks/indexing_tasks.py | 21 ++++++++++++++------- record_indexer/README.md | 14 +++++++++++--- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 4602d025..0a9e3dbb 100644 --- a/README.md +++ b/README.md @@ -205,21 +205,25 @@ If you would like to mount your own codebase to the content_harvester container export MOUNT_CODEBASE= ``` -In order to run the indexer code, make sure the following variables are set: +In order to run the indexer code against an AWS hosted OpenSearch, make sure the following variables are set in startup.sh: ``` export OPENSEARCH_ENDPOINT= # ask for endpoint url -``` - -Also make sure to set your temporary AWS credentials and the region so that the mwaa-local-runner container can authenticate when talking to the OpenSearch API: - -``` export AWS_ACCESS_KEY_ID= export AWS_SECRET_ACCESS_KEY= export AWS_SESSION_TOKEN= export AWS_REGION=us-west-2 ``` +If using a local Docker container to run a dev OpenSearch, set the following variables in startup.sh: + +``` +export OPENSEARCH_ENDPOINT=https://host.docker.internal:9200/ +export OPENSEARCH_USER=admin +export OPENSEARCH_PASS="Rikolti_05" +export OPENSEARCH_IGNORE_TLS=True +``` + Finally, from inside the aws-mwaa-local-runner repo, run `./mwaa-local-env build-image` to build the docker image, and `./mwaa-local-env start` to start the mwaa local environment. For more information on `mwaa-local-env`, look for instructions in the [ucldc/aws-mwaa-local-runner:README](https://github.com/ucldc/aws-mwaa-local-runner/#readme) to build the docker image, run the container, and do local development. diff --git a/dags/shared_tasks/indexing_tasks.py b/dags/shared_tasks/indexing_tasks.py index 6816d0f2..675ae8bd 100644 --- a/dags/shared_tasks/indexing_tasks.py +++ b/dags/shared_tasks/indexing_tasks.py @@ -32,21 +32,22 @@ def index_collection_task(alias, collection, version_pages, context): f"{version}/data/" ) opensearch_url = ( - f"{os.environ.get('OPENSEARCH_ENDPOINT')}/_dashboards/app/" - "dev_tools#/console" + f"{os.environ.get('OPENSEARCH_ENDPOINT', '').rstrip('/')}/" + "_dashboards/app/dev_tools#/console" ) calisphere_url = f"/collections/{collection_id}/" + print(alias) if alias == 'rikolti-prd': - calisphere_url = f"https://calisphere.org/{calisphere_url}" + calisphere_url = f"https://calisphere.org{calisphere_url}" else: - calisphere_url = f"https://calisphere-stage.cdlib.org/{calisphere_url}" + calisphere_url = f"https://calisphere-stage.cdlib.org{calisphere_url}" print( f"{hr}Review indexed records at:\n {s3_url}\n\n" f"On opensearch at:\n {opensearch_url}\nwith query:\n" f"GET /{alias}/_search\n" f"{json.dumps(dashboard_query, indent=2)}\n\n" - f"On calisphere-stage at:\n {calisphere_url}\n{end}" + f"On calisphere at:\n {calisphere_url}\n{end}" ) verbed = "published" if alias == 'rikolti-prd' else "staged" print( @@ -74,14 +75,20 @@ def get_version_pages(params=None): return version_pages -@task(task_id="create_stage_index", on_failure_callback=notify_rikolti_failure) +@task( + task_id="create_stage_index", + on_failure_callback=notify_rikolti_failure, + pool="rikolti_opensearch_pool") def stage_collection_task( collection: dict, version_pages: list[str], **context): index_collection_task("rikolti-stg", collection, version_pages, context) -@task(task_id="publish_collection", on_failure_callback=notify_rikolti_failure) +@task( + task_id="publish_collection", + on_failure_callback=notify_rikolti_failure, + pool="rikolti_opensearch_pool") def publish_collection_task( collection: dict, version_pages: list[str], **context): diff --git a/record_indexer/README.md b/record_indexer/README.md index 861947d1..4b5d3101 100644 --- a/record_indexer/README.md +++ b/record_indexer/README.md @@ -6,9 +6,13 @@ Records must adhere strictly to the fields specified [our index template](index_ Our `record_indexer` component is designed to remove any fields that are not in our index template. The `record_indexer` indexes records by collection into indicies identified by aliases. -## Configuring the Record Indexer - AWS and Docker Options +## Configuring the Record Indexer -The Record Indexer indexes records by hitting the configured `OPENSEARCH_ENDPOINT` - the API endpoint for an opensearch instance. Rikolti supports authenticating against an AWS hosted OpenSearch endpoint (via IAM permissioning and/or `AWS_*` environment variables) or using basic auth against a dev OpenSearch Docker container +The Record Indexer indexes records by hitting the configured `OPENSEARCH_ENDPOINT` - the API endpoint for an opensearch instance. + +Rikolti requires that all requests made to the OpenSearch API via Airflow be pooled in order to throttle concurrent reindexing requests. All Airflow tasks making requests to the OpenSearch index have `pool="rikolti_opensearch_pool"`. These tasks will not run in your Airflow instance until you create the pool using the Airflow UI, by navigating to Admin > Pools, then creating a new pool called `rikolti_opensearch_pool` with at least 1 Slot. + +Rikolti supports authenticating against an AWS hosted OpenSearch endpoint (via IAM permissioning and/or `AWS_*` environment variables) or using basic auth against a dev OpenSearch Docker container. ### AWS Hosted OpenSearch If you're trying to set up the record_indexer to communicate with an AWS hosted OpenSearch instance, set the `OPENSEARCH_ENDPOINT` to the AWS-provided endpoint. Make sure your machine or your AWS account has access, and, if relevant, set the following environment variables: `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, `AWS_SESSION_TOKEN`, `AWS_REGION`. @@ -46,6 +50,8 @@ export OPENSEARCH_ENDPOINT=https://localhost:9200/ ## Initializing an OpenSearch instance to work with Rikolti +These initialization instructions must be run for either a new AWS Hosted OpenSearch domain, or a new local Docker Container. + Make sure that OPENSEARCH_ENDPOINT and the relevant authentication is set in your environment. 1. Create an index template for rikolti: @@ -62,7 +68,9 @@ This creates a record template that will be used for adding documents to any ind python -m record_indexer.initialize.indices_and_aliases ``` -This creates an empty index named `rikolti-` (enforcing the use of the rikolti_template for all records indexed into it) and assigns it to the alias `rikolti-stg`. +This creates 2 empty indexes named `rikolti-` (enforcing the use of the rikolti_template for all records indexed into it) and assigns it to the alias `rikolti-stg` or `rikolti-prd`, respectively. + +You can provide your own index name by using `-s`, `--stg-name` or `-p`, `--prd-name` options. The script will prefix whatever name you provide with `rikolti-`. ## Running the Record Indexer From 60ab54c940e21fd0860eb97d779e2f27d4c20792 Mon Sep 17 00:00:00 2001 From: amy wieliczka Date: Tue, 9 Jul 2024 12:20:53 -0500 Subject: [PATCH 15/15] Add lowercase_trim normalizer to type.raw field --- record_indexer/index_templates/record_index_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/record_indexer/index_templates/record_index_config.py b/record_indexer/index_templates/record_index_config.py index 98dc2d34..a023f53f 100644 --- a/record_indexer/index_templates/record_index_config.py +++ b/record_indexer/index_templates/record_index_config.py @@ -68,7 +68,7 @@ "spatial": {"type": "text", "analyzer": "asciifolded_english", "fields": {"raw": {"type": "keyword"}}}, "subject": {"type": "text", "analyzer": "asciifolded_english", "fields": {"raw": {"type": "keyword"}}}, "temporal": {"type": "text", "analyzer": "asciifolded_english", "fields": {"raw": {"type": "keyword"}}}, - "type": {"type": "text", "analyzer": "asciifolded_english", "fields": {"raw": {"type": "keyword"}}}, + "type": {"type": "text", "analyzer": "asciifolded_english", "fields": {"raw": {"type": "keyword", "normalizer": "lowercase_trim"}}}, "sort_title": {"type": "keyword", "normalizer": "lowercase_trim"}, "facet_decade": {"type": "text", "analyzer": "asciifolded_english", "fields": {"raw": {"type": "keyword"}}},