Skip to content

Commit

Permalink
Indexing: remove fields not in opensearch schema
Browse files Browse the repository at this point in the history
  • Loading branch information
amywieliczka committed Sep 26, 2024
1 parent 6a3705b commit 89ca7e0
Showing 1 changed file with 42 additions and 18 deletions.
60 changes: 42 additions & 18 deletions record_indexer/index_page.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,54 @@ def build_bulk_request_body(records: list, index: str):
return body


def remove_unexpected_fields(record: dict, expected_fields: list):
def remove_unexpected_fields(record: dict, schema: dict):
removed_fields = []
for field in list(record.keys()):
if field not in expected_fields:
for field in record.keys():
if field not in schema.keys():
removed_fields.append(field)
record.pop(field)

# TODO: not sure if we want to qualify field names with the parent id
parent_id = record.get('calisphere-id')
for child in record.get('children', []):
removed_fields_from_child = remove_unexpected_fields(
child, expected_fields)
removed_fields += [
f"{parent_id}[{field}]" for field in removed_fields_from_child
]
continue

subschema = schema[field].get('properties')
if subschema:
if schema[field].get('type') == 'nested':
# recursively remove fields for a list of nested records
for child in record[field]:
removed = remove_unexpected_fields(child, subschema)
removed = [f"{field}.{subfield}" for subfield in removed]
removed_fields += removed
else:
# recursively remove fields for a single nested record
removed = remove_unexpected_fields(record[field], subschema)
removed = [f"{field}.{subfield}" for subfield in removed]
removed_fields += removed

# remove duplicates - if a field is nested, we'd see
# 'children.removed_field' for each child containing the removed
# field, potentially hundreds of times
removed_fields = list(set(removed_fields))

return removed_fields


def get_expected_fields():
record_schema = RECORD_INDEX_CONFIG["template"]["mappings"]["properties"]
expected_fields = list(record_schema.keys())
def get_opensearch_schema(index_alias: str):
url = f"{settings.ENDPOINT}/{index_alias}/_mapping"
r = requests.get(
url,
headers={"Content-Type": "application/json"},
auth=settings.get_auth(),
verify=settings.verify_certs()
)
if not (200 <= r.status_code <= 299):
print_opensearch_error(r, url)
r.raise_for_status()

schema = r.json().get('mappings', {}).get('properties')

# record_schema = RECORD_INDEX_CONFIG["template"]["mappings"]["properties"]
# expected_fields = list(record_schema.keys())

return expected_fields
return schema


def index_page(version_page: str, index: str, rikolti_data: dict):
Expand All @@ -100,10 +124,10 @@ def index_page(version_page: str, index: str, rikolti_data: dict):
else:
records = get_with_content_urls_page_content(version_page)

expected_fields = get_expected_fields()
schema = get_opensearch_schema(index)
removed_fields_report = defaultdict(list)
for record in records:
removed_fields = remove_unexpected_fields(record, expected_fields)
removed_fields = remove_unexpected_fields(record, schema)
calisphere_id = record.get("calisphere-id", None)
for field in removed_fields:
removed_fields_report[field].append(calisphere_id)
Expand Down

0 comments on commit 89ca7e0

Please sign in to comment.