Skip to content

Commit

Permalink
[Fetch Migration] Migrate index templates and component templates as …
Browse files Browse the repository at this point in the history
…a part of metadata migration (opensearch-project#477)

This change enables index template and component template migration from source to target cluster as a part of the metadata migration step of Fetch Migration. Now that metadata migration performs multiple distinct steps, these have been refactored into encapsulated functions rather than being inline in the run entrypoint.

New functions have been added to index_operations.py to fetch and create component templates and index templates. The IndexTemplateInfo class derives from the ComponentTemplateInfo class since the former can include a "template" definition that overrides its components, and it includes additional fields (such as "composed_of", "priority" and "index_patterns")

Unit tests have been added to maintain high test coverage.

---------

Signed-off-by: Kartik Ganesh <gkart@amazon.com>
  • Loading branch information
kartg committed Jan 25, 2024
1 parent f7b10a9 commit 71420f2
Show file tree
Hide file tree
Showing 7 changed files with 466 additions and 40 deletions.
34 changes: 34 additions & 0 deletions FetchMigration/python/component_template_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#


# Constants
from typing import Optional

NAME_KEY = "name"
DEFAULT_TEMPLATE_KEY = "component_template"


# Class that encapsulates component template information
class ComponentTemplateInfo:
# Private member variables
__name: str
__template_def: Optional[dict]

def __init__(self, template_payload: dict, template_key: str = DEFAULT_TEMPLATE_KEY):
self.__name = template_payload[NAME_KEY]
self.__template_def = None
if template_key in template_payload:
self.__template_def = template_payload[template_key]

def get_name(self) -> str:
return self.__name

def get_template_definition(self) -> dict:
return self.__template_def
61 changes: 61 additions & 0 deletions FetchMigration/python/index_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,21 @@
import jsonpath_ng
import requests

from component_template_info import ComponentTemplateInfo
from endpoint_info import EndpointInfo
from index_doc_count import IndexDocCount
from index_template_info import IndexTemplateInfo

# Constants
SETTINGS_KEY = "settings"
MAPPINGS_KEY = "mappings"
ALIASES_KEY = "aliases"
COUNT_KEY = "count"
__INDEX_KEY = "index"
__COMPONENT_TEMPLATE_LIST_KEY = "component_templates"
__INDEX_TEMPLATE_LIST_KEY = "index_templates"
__INDEX_TEMPLATES_PATH = "/_index_template"
__COMPONENT_TEMPLATES_PATH = "/_component_template"
__ALL_INDICES_ENDPOINT = "*"
# (ES 7+) size=0 avoids the "hits" payload to reduce the response size since we're only interested in the aggregation,
# and track_total_hits forces an accurate doc-count
Expand Down Expand Up @@ -106,3 +112,58 @@ def doc_count(indices: set, endpoint: EndpointInfo) -> IndexDocCount:
return IndexDocCount(total, count_map)
except RuntimeError as e:
raise RuntimeError(f"Failed to fetch doc_count: {e!s}")


def __fetch_templates(endpoint: EndpointInfo, path: str, root_key: str, factory) -> set:
url: str = endpoint.add_path(path)
# raises RuntimeError in case of any request errors
try:
resp = __send_get_request(url, endpoint)
result = set()
if root_key in resp.json():
for template in resp.json()[root_key]:
result.add(factory(template))
return result
except RuntimeError as e:
# Chain the underlying exception as a cause
raise RuntimeError("Failed to fetch template metadata from cluster endpoint") from e


def fetch_all_component_templates(endpoint: EndpointInfo) -> set[ComponentTemplateInfo]:
try:
# raises RuntimeError in case of any request errors
return __fetch_templates(endpoint, __COMPONENT_TEMPLATES_PATH, __COMPONENT_TEMPLATE_LIST_KEY,
lambda t: ComponentTemplateInfo(t))
except RuntimeError as e:
raise RuntimeError("Failed to fetch component template metadata") from e


def fetch_all_index_templates(endpoint: EndpointInfo) -> set[IndexTemplateInfo]:
try:
# raises RuntimeError in case of any request errors
return __fetch_templates(endpoint, __INDEX_TEMPLATES_PATH, __INDEX_TEMPLATE_LIST_KEY,
lambda t: IndexTemplateInfo(t))
except RuntimeError as e:
raise RuntimeError("Failed to fetch index template metadata") from e


def __create_templates(templates: set[ComponentTemplateInfo], endpoint: EndpointInfo, template_path: str) -> dict:
failures = dict()
for template in templates:
template_endpoint = endpoint.add_path(template_path + "/" + template.get_name())
try:
resp = requests.put(template_endpoint, auth=endpoint.get_auth(), verify=endpoint.is_verify_ssl(),
json=template.get_template_definition(), timeout=__TIMEOUT_SECONDS)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
failures[template.get_name()] = e
# Loop completed, return failures if any
return failures


def create_component_templates(templates: set[ComponentTemplateInfo], endpoint: EndpointInfo) -> dict:
return __create_templates(templates, endpoint, __COMPONENT_TEMPLATES_PATH)


def create_index_templates(templates: set[IndexTemplateInfo], endpoint: EndpointInfo) -> dict:
return __create_templates(templates, endpoint, __INDEX_TEMPLATES_PATH)
23 changes: 23 additions & 0 deletions FetchMigration/python/index_template_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright OpenSearch Contributors
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#

from component_template_info import ComponentTemplateInfo

# Constants
INDEX_TEMPLATE_KEY = "index_template"


# Class that encapsulates index template information from a cluster.
# Subclass of ComponentTemplateInfo because the structure of an index
# template is identical to a component template, except that it uses
# a different template key. Also, index templates can be "composed" of
# one or more component templates.
class IndexTemplateInfo(ComponentTemplateInfo):
def __init__(self, template_payload: dict):
super().__init__(template_payload, INDEX_TEMPLATE_KEY)
115 changes: 78 additions & 37 deletions FetchMigration/python/metadata_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import endpoint_utils
import index_operations
import utils
from endpoint_info import EndpointInfo
from index_diff import IndexDiff
from metadata_migration_params import MetadataMigrationParams
from metadata_migration_result import MetadataMigrationResult
Expand Down Expand Up @@ -50,60 +51,100 @@ def print_report(diff: IndexDiff, total_doc_count: int): # pragma no cover
logging.info("Target document count: " + str(total_doc_count))


def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
# Sanity check
if not args.report and len(args.output_file) == 0:
raise ValueError("No output file specified")
# Parse and validate pipelines YAML file
with open(args.config_file_path, 'r') as pipeline_file:
dp_config = yaml.safe_load(pipeline_file)
# We expect the Data Prepper pipeline to only have a single top-level value
pipeline_config = next(iter(dp_config.values()))
# Raises a ValueError if source or sink definitions are missing
endpoint_utils.validate_pipeline(pipeline_config)
source_endpoint_info = endpoint_utils.get_endpoint_info_from_pipeline_config(pipeline_config,
endpoint_utils.SOURCE_KEY)
target_endpoint_info = endpoint_utils.get_endpoint_info_from_pipeline_config(pipeline_config,
endpoint_utils.SINK_KEY)
def index_metadata_migration(source: EndpointInfo, target: EndpointInfo,
args: MetadataMigrationParams) -> MetadataMigrationResult:
result = MetadataMigrationResult()
# Fetch indices
source_indices = index_operations.fetch_all_indices(source_endpoint_info)
source_indices = index_operations.fetch_all_indices(source)
# If source indices is empty, return immediately
if len(source_indices.keys()) == 0:
return result
target_indices = index_operations.fetch_all_indices(target_endpoint_info)
# Compute index differences and print report
target_indices = index_operations.fetch_all_indices(target)
# Compute index differences and create result object
diff = IndexDiff(source_indices, target_indices)
if diff.identical_indices:
# Identical indices with zero documents on the target are eligible for migration
target_doc_count = index_operations.doc_count(diff.identical_indices, target_endpoint_info)
target_doc_count = index_operations.doc_count(diff.identical_indices, target)
# doc_count only returns indices that have non-zero counts, so the difference in responses
# gives us the set of identical, empty indices
result.migration_indices = diff.identical_indices.difference(target_doc_count.index_doc_count_map.keys())
diff.set_identical_empty_indices(result.migration_indices)
if diff.indices_to_create:
result.migration_indices.update(diff.indices_to_create)
if result.migration_indices:
doc_count_result = index_operations.doc_count(result.migration_indices, source_endpoint_info)
doc_count_result = index_operations.doc_count(result.migration_indices, source)
result.target_doc_count = doc_count_result.total
# Print report
if args.report:
print_report(diff, result.target_doc_count)
if result.migration_indices:
# Write output YAML
if len(args.output_file) > 0:
write_output(dp_config, result.migration_indices, args.output_file)
logging.debug("Wrote output YAML pipeline to: " + args.output_file)
if not args.dryrun:
index_data = dict()
for index_name in diff.indices_to_create:
index_data[index_name] = source_indices[index_name]
failed_indices = index_operations.create_indices(index_data, target_endpoint_info)
fail_count = len(failed_indices)
if fail_count > 0:
logging.error(f"Failed to create {fail_count} of {len(index_data)} indices")
for failed_index_name, error in failed_indices.items():
logging.error(f"Index name {failed_index_name} failed: {error!s}")
raise RuntimeError("Metadata migration failed, index creation unsuccessful")
# Create index metadata on target
if result.migration_indices and not args.dryrun:
index_data = dict()
for index_name in diff.indices_to_create:
index_data[index_name] = source_indices[index_name]
failed_indices = index_operations.create_indices(index_data, target)
fail_count = len(failed_indices)
if fail_count > 0:
logging.error(f"Failed to create {fail_count} of {len(index_data)} indices")
for failed_index_name, error in failed_indices.items():
logging.error(f"Index name {failed_index_name} failed: {error!s}")
raise RuntimeError("Metadata migration failed, index creation unsuccessful")
return result


# Returns true if there were failures, false otherwise
def __log_template_failures(failures: dict, target_count: int) -> bool:
fail_count = len(failures)
if fail_count > 0:
logging.error(f"Failed to create {fail_count} of {target_count} templates")
for failed_template_name, error in failures.items():
logging.error(f"Template name {failed_template_name} failed: {error!s}")
# Return true to signal failures
return True
else:
# No failures, return false
return False


# Raises RuntimeError if component/index template migration fails
def template_migration(source: EndpointInfo, target: EndpointInfo):
# Fetch and migrate component templates first
templates = index_operations.fetch_all_component_templates(source)
failures = index_operations.create_component_templates(templates, target)
if not __log_template_failures(failures, len(templates)):
# Only migrate index templates if component template migration had no failures
templates = index_operations.fetch_all_index_templates(source)
failures = index_operations.create_index_templates(templates, target)
if __log_template_failures(failures, len(templates)):
raise RuntimeError("Failed to create some index templates")
else:
raise RuntimeError("Failed to create some component templates, aborting index template creation")


def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
# Sanity check
if not args.report and len(args.output_file) == 0:
raise ValueError("No output file specified")
# Parse and validate pipelines YAML file
with open(args.config_file_path, 'r') as pipeline_file:
dp_config = yaml.safe_load(pipeline_file)
# We expect the Data Prepper pipeline to only have a single top-level value
pipeline_config = next(iter(dp_config.values()))
# Raises a ValueError if source or sink definitions are missing
endpoint_utils.validate_pipeline(pipeline_config)
source_endpoint_info = endpoint_utils.get_endpoint_info_from_pipeline_config(pipeline_config,
endpoint_utils.SOURCE_KEY)
target_endpoint_info = endpoint_utils.get_endpoint_info_from_pipeline_config(pipeline_config,
endpoint_utils.SINK_KEY)
result = index_metadata_migration(source_endpoint_info, target_endpoint_info, args)
# Write output YAML
if result.migration_indices and len(args.output_file) > 0:
write_output(dp_config, result.migration_indices, args.output_file)
logging.debug("Wrote output YAML pipeline to: " + args.output_file)
if not args.dryrun:
# Create component and index templates, may raise RuntimeError
template_migration(source_endpoint_info, target_endpoint_info)
# Finally return result
return result


Expand Down Expand Up @@ -135,6 +176,6 @@ def run(args: MetadataMigrationParams) -> MetadataMigrationResult:
arg_parser.add_argument("--report", "-r", action="store_true",
help="Print a report of the index differences")
arg_parser.add_argument("--dryrun", action="store_true",
help="Skips the actual creation of indices on the target cluster")
help="Skips the actual creation of metadata on the target cluster")
namespace = arg_parser.parse_args()
run(MetadataMigrationParams(namespace.config_file_path, namespace.output_file, namespace.report, namespace.dryrun))
Loading

0 comments on commit 71420f2

Please sign in to comment.