diff --git a/README.md b/README.md index 7b4cdc87a..4602d0251 100644 --- a/README.md +++ b/README.md @@ -208,8 +208,7 @@ export MOUNT_CODEBASE= By default, OpenSearch uses self-signed TLS certificates. The -k short option skips the certificate verification step so requests don't fail ``` -python -m record_indexer.index_templates.rikolti_template +curl -X GET "https://localhost:9200/_cat/indices" -ku admin:Rikolti_05 +``` + +To use this docker container with the record indexer, you will have to configure: + +``` +export OPENSEARCH_USER=admin +export OPENSEARCH_PASS=Rikolti_05 +export OPENSEARCH_IGNORE_TLS=True +``` + +**To connect to this OpenSearch docker container from mwaa-local-runner, set the previous values and the below endpoint in dags/startup.sh:** + +``` +export OPENSEARCH_ENDPOINT=https://host.docker.internal:9200/ +``` + +**To use this OpenSearch docker container from your host machine, set the previous values and the below endpoint in env.local:** + +``` +export OPENSEARCH_ENDPOINT=https://localhost:9200/ ``` -This creates a template that will be used whenever an index with name matching `rikolti*` is added to the cluster. +## Initializing an OpenSearch instance to work with Rikolti -## Run indexer from command line +Make sure that OPENSEARCH_ENDPOINT and the relevant authentication is set in your environment. -Create a new index for a collection and add it to the `rikolti-stg` alias: +1. Create an index template for rikolti: ``` -python -m record_indexer.create_collection_index +python -m record_indexer.index_templates.rikolti_template ``` -Add the current stage index for a collection to the `rikolti-prd` alias: +This creates a record template that will be used for adding documents to any index with name matching `rikolti*` is added to the cluster. + +2. Create an index and add it to the alias `rikolti-stg` ``` -python -m record_indexer.move_index_to_prod +python -m record_indexer.initialize.add_rikolti-stg_index ``` +This creates an empty index named `rikolti-` (enforcing the use of the rikolti_template for all records indexed into it) and assigns it to the alias `rikolti-stg`. + +## Running the Record Indexer + +TODO: We don't currently support running the indexer from the command line + ## Indexer development using aws-mwaa-local-runner See the Rikolti README page section on [Airflow Development](https://github.com/ucldc/rikolti/#airflow-development). In particular, make sure that indexer-related env vars are set as described there. ## Index lifecycle +TODO: this section is all pretty defunct now; update documentation + The lifecycle of an index is as follows: #### Create new index diff --git a/record_indexer/add_page_to_index.py b/record_indexer/add_page_to_index.py index 35d9123de..bd1686f22 100644 --- a/record_indexer/add_page_to_index.py +++ b/record_indexer/add_page_to_index.py @@ -18,7 +18,13 @@ def bulk_add(records: list, index: str): headers = {"Content-Type": "application/json"} - r = requests.post(url, headers=headers, data=data, auth=settings.get_auth()) + r = requests.post( + url, + headers=headers, + data=data, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) if not (200 <= r.status_code <= 299): print_opensearch_error(r, url) r.raise_for_status() diff --git a/record_indexer/create_collection_index.py b/record_indexer/create_collection_index.py deleted file mode 100644 index d7ed79cea..000000000 --- a/record_indexer/create_collection_index.py +++ /dev/null @@ -1,128 +0,0 @@ -import argparse -from datetime import datetime -import json -import sys - -import requests - -from .add_page_to_index import add_page -from . import settings -from .utils import print_opensearch_error -from rikolti.utils.versions import ( - get_merged_pages, get_with_content_urls_pages) - - -def update_alias_for_collection(alias: str, collection_id: str, index: str): - remove_collection_indices_from_alias(alias, collection_id) - - url = f"{settings.ENDPOINT}/_aliases" - headers = {"Content-Type": "application/json"} - - data = {"actions": [{"add": {"index": index, "alias": alias}}]} - - r = requests.post( - url, headers=headers, data=json.dumps(data), auth=settings.get_auth()) - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - print(f"added index `{index}` to alias `{alias}`") - - -def remove_collection_indices_from_alias(alias: str, collection_id: str): - url = f"{settings.ENDPOINT}/_alias/{alias}" - r = requests.get(url=url, auth=settings.get_auth()) - if r.status_code == 404: - return - else: - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - indices = json.loads(r.text) - indices_to_remove = [ - key for key in indices if key.startswith(f"rikolti-{collection_id}-") - ] - - if len(indices_to_remove) > 0: - url = f"{settings.ENDPOINT}/_aliases" - headers = {"Content-Type": "application/json"} - data = { - "actions": [ - {"remove": {"indices": indices_to_remove, "alias": alias}} - ] - } - - r = requests.post( - url, headers=headers, data=json.dumps(data), auth=settings.get_auth() - ) - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - print(f"removed indices `{indices_to_remove}` from alias `{alias}`") - - -def delete_old_collection_indices(collection_id: str): - """ - Deletes older unaliased indices, retaining a specified number - """ - url = f"{settings.ENDPOINT}/rikolti-{collection_id}-*" - params = {"ignore_unavailable": "true"} - r = requests.get(url=url, params=params, auth=settings.get_auth()) - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - indices = json.loads(r.text) - - unaliased_indices = {} - for index in indices.keys(): - if not indices[index]["aliases"]: - creation_date = indices[index]["settings"]["index"]["creation_date"] - unaliased_indices[creation_date] = index - - counter = 0 - for date in reversed(sorted(unaliased_indices)): - counter += 1 - if counter > int(settings.INDEX_RETENTION): - delete_index(unaliased_indices[date]) - - -def delete_index(index: str): - url = f"{settings.ENDPOINT}/{index}" - - r = requests.delete(url, auth=settings.get_auth()) - if r.status_code == 404: - return - else: - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - print(f"deleted index `{index}`") - - -def create_index_name(collection_id: str): - version = datetime.today().strftime("%Y%m%d%H%M%S") - return f"rikolti-{collection_id}-{version}" - - -def create_new_index(collection_id: str, version_pages: list[str], index_name: str): - # OpenSearch creates the index on the fly when first written to. - for version_page in version_pages: - add_page(version_page, index_name) - - update_alias_for_collection(settings.STAGE_ALIAS, collection_id, index_name) - - delete_old_collection_indices(collection_id) - - return index_name - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Add collection data to OpenSearch") - parser.add_argument("collection_id", help="Registry collection ID") - parser.add_argument("version", help="Metadata verison to index") - args = parser.parse_args(sys.argv[1:]) - if 'merged' in args.version: - page_list = get_merged_pages(args.version) - else: - page_list = get_with_content_urls_pages(args.version) - create_new_index(args.collection_id, page_list) - sys.exit(0) diff --git a/record_indexer/docker-compose.yml b/record_indexer/docker-compose.yml new file mode 100644 index 000000000..b4492e67f --- /dev/null +++ b/record_indexer/docker-compose.yml @@ -0,0 +1,9 @@ +services: + opensearch-node: + image: opensearchproject/opensearch:latest + ports: + - "9200:9200" + - "9600:9600" + environment: + - "discovery.type=single-node" + - OPENSEARCH_INITIAL_ADMIN_PASSWORD=Rikolti_05 diff --git a/record_indexer/index_templates/rikolti_template.py b/record_indexer/index_templates/rikolti_template.py index 9c6f4437f..1bf413ed1 100644 --- a/record_indexer/index_templates/rikolti_template.py +++ b/record_indexer/index_templates/rikolti_template.py @@ -52,6 +52,7 @@ def main(): headers={"Content-Type": "application/json"}, data=json.dumps(RECORD_INDEX_CONFIG), auth=settings.get_auth(), + verify=settings.verify_certs() ) if not (200 <= r.status_code <= 299): print_opensearch_error(r, url) diff --git a/record_indexer/initialize/add_rikolti-stg_index.py b/record_indexer/initialize/add_rikolti-stg_index.py new file mode 100644 index 000000000..243d51b62 --- /dev/null +++ b/record_indexer/initialize/add_rikolti-stg_index.py @@ -0,0 +1,83 @@ +import argparse +import json +import sys +from datetime import datetime + +import requests + +from .. import settings +from ..utils import print_opensearch_error + +""" + Create a rikolti index and add the rikolti-stg alias to it + https://opensearch.org/docs/2.3/opensearch/index-templates/ +""" + + +def main(name=None): + # check if index with rikolti-stg alias exists + url = f"{settings.ENDPOINT}/_alias/rikolti-stg" + r = requests.get( + url, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if r.status_code == 200: + print("Index with alias rikolti-stg already exists; OpenSearch " + "instance ready for use") + return + + # create index + if not name: + name = datetime.now().strftime("%Y%m%d%H%M%S") + + index_name = f"rikolti-{name}" + url = f"{settings.ENDPOINT}/{index_name}" + r = requests.put( + url, + headers={"Content-Type": "application/json"}, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if not (200 <= r.status_code <= 299): + print_opensearch_error(r, url) + r.raise_for_status() + print(r.text) + + # add alias + url = f"{settings.ENDPOINT}/_aliases" + data = { + "actions": [ + {"add": {"index": index_name, "alias": "rikolti-stg"}} + ] + } + r = requests.post( + url, + headers={"Content-Type": "application/json"}, + data=json.dumps(data), + auth=settings.get_auth(), + verify=settings.verify_certs() + ) + if not (200 <= r.status_code <= 299): + print_opensearch_error(r, url) + r.raise_for_status() + print(r.text) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=( + "Creates an empty index at rikolti-; if no name " + "provided, uses the current timestamp. Adds the index to " + "the rikolti-stg alias." + ) + ) + parser.add_argument( + "-n", "--name", + help=( + "Optional name for the index; created index will be " + "rikolti-, if none provided, current timestamp will be used" + ) + ) + args = parser.parse_args() + sys.exit(main(args.name)) diff --git a/record_indexer/move_index_to_prod.py b/record_indexer/move_index_to_prod.py deleted file mode 100644 index 8567ffa67..000000000 --- a/record_indexer/move_index_to_prod.py +++ /dev/null @@ -1,49 +0,0 @@ -import argparse -import json -import sys - -import requests - -from .create_collection_index import update_alias_for_collection -from . import settings -from .utils import print_opensearch_error - - -def move_index_to_prod(collection_id: str): - """ - Add current rikolti-stg index to rikolti-prd alias - """ - url = f"{settings.ENDPOINT}/_alias/{settings.STAGE_ALIAS}" - r = requests.get(url=url, auth=settings.get_auth()) - if not (200 <= r.status_code <= 299): - print_opensearch_error(r, url) - r.raise_for_status() - indices = json.loads(r.text) - indices_for_collection = [ - key for key in indices if key.startswith(f"rikolti-{collection_id}-") - ] - - if len(indices_for_collection) == 1: - update_alias_for_collection( - "rikolti-prd", collection_id, indices_for_collection[0] - ) - elif len(indices_for_collection) > 1: - print( - f"{collection_id}: More than one index associated with " - f"`{settings.STAGE_ALIAS}` alias: `{indices_for_collection}`" - ) - return - elif len(indices_for_collection) < 1: - print( - f"{collection_id}: Cannot find any indices associated with " - f"`{settings.STAGE_ALIAS}` alias" - ) - return - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Add staged index to production") - parser.add_argument("collection_id", help="Registry collection ID") - args = parser.parse_args(sys.argv[1:]) - move_index_to_prod(args.collection_id) - sys.exit(0) diff --git a/record_indexer/settings.py b/record_indexer/settings.py index 4ab6db1f2..95842fd96 100644 --- a/record_indexer/settings.py +++ b/record_indexer/settings.py @@ -6,7 +6,16 @@ load_dotenv() +es_user = os.environ.get("OPENSEARCH_USER") +es_pass = os.environ.get("OPENSEARCH_PASS") + +def verify_certs(): + return not os.environ.get("OPENSEARCH_IGNORE_TLS", False) + def get_auth(): + if es_user and es_pass: + return (es_user, es_pass) + credentials = Session().get_credentials() if not credentials: return False @@ -14,8 +23,6 @@ def get_auth(): credentials, os.environ.get("AWS_REGION", "us-west-2")) -ENDPOINT = os.environ.get("RIKOLTI_ES_ENDPOINT", False) +ENDPOINT = os.environ.get("OPENSEARCH_ENDPOINT", False) if ENDPOINT: ENDPOINT = ENDPOINT.rstrip("/") -INDEX_RETENTION = os.environ.get("INDEX_RETENTION", 0) -STAGE_ALIAS = os.environ.get("RIKOLTI_ES_STAGE_ALIAS") diff --git a/record_indexer/update_stage_index.py b/record_indexer/update_stage_index.py index e35ada2bb..ff68e9740 100644 --- a/record_indexer/update_stage_index.py +++ b/record_indexer/update_stage_index.py @@ -7,7 +7,7 @@ def update_stage_index_for_collection(collection_id: str, version_pages: list[str]): ''' update stage index with a new set of collection records ''' - index = get_index_for_alias(settings.STAGE_ALIAS) + index = get_index_for_alias("rikolti-stg") # delete existing records delete_collection_records_from_index(collection_id, index) @@ -19,7 +19,11 @@ def update_stage_index_for_collection(collection_id: str, version_pages: list[st def get_index_for_alias(alias: str): # for now, there should be only one index per alias (stage, prod) url = f"{settings.ENDPOINT}/_alias/{alias}" - r = requests.get(url, auth=settings.get_auth()) + r = requests.get( + url, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) if not (200 <= r.status_code <= 299): print_opensearch_error(r, url) r.raise_for_status() @@ -43,7 +47,12 @@ def delete_collection_records_from_index(collection_id: str, index: str): } r = requests.post( - url=url, data=json.dumps(data), headers=headers, auth=settings.get_auth()) + url=url, + data=json.dumps(data), + headers=headers, + auth=settings.get_auth(), + verify=settings.verify_certs() + ) if not (200 <= r.status_code <= 299): print_opensearch_error(r, url) r.raise_for_status()