Skip to content

Commit

Permalink
fixup! Add JSONL-based verbatim manifest format (#6028)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Mar 15, 2024
1 parent d4841b1 commit ea755a2
Showing 1 changed file with 35 additions and 19 deletions.
54 changes: 35 additions & 19 deletions src/azul/service/manifest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2011,6 +2011,33 @@ def qualify(qualifier, column_name, index=None):
bundle_tsv_writer.writerow(row)


@attrs.frozen(kw_only=True)
class ReplicaKeys:
"""
For most entity types, the replica document contains an explicit list of its
the entity's hubs. However, entities
hub cardinality (e.g. projects)
have too many hubs to track within their replicas. These entities are instead
"""
hub_id: str
# ... for projects and datasets, there are too many hubs to
# track them all in the replica, so they are instead retrieved
# by entity ID.
implicit_hub_entity_id: str

@classmethod
def prepare_query(cls, keys: Iterable[Self]) -> Q:
terms = {'hub_ids': set(), 'entity_id': set()}
for self in keys:
terms['hub_ids'].add(self.hub_id)
terms['entity_id'].add(self.implicit_hub_entity_id)
return Q('bool', should=[
{'terms': {f'{field}.keyword': list(values)}}
for field, values in terms.items()
])


class VerbatimManifestGenerator(FileBasedManifestGenerator):

@property
Expand Down Expand Up @@ -2043,17 +2070,13 @@ def included_fields(self) -> list[FieldPath]:
def implicit_hub_type(self) -> str:
return self.service.metadata_plugin(self.catalog).implicit_hub_type

def _replica_keys(self) -> Iterable[dict[str, str]]:
def _replica_keys(self) -> Iterable[ReplicaKeys]:
request = self._create_request()
for hit in request.scan():
yield {
# Most replicas track their hubs explicitly, however...
'hub_ids': hit['entity_id'],
# ... for projects and datasets, there are too many hubs to
# track them all in the replica, so they are instead retrieved
# by entity ID.
'entity_id': one(one(hit['contents'][self.implicit_hub_type])['document_id'])
}
yield ReplicaKeys(
hub_id=hit['entity_id'],
implicit_hub_entity_id=one(one(hit['contents'][self.implicit_hub_type])['document_id'])
)

def _all_replicas(self) -> Iterable[JSON]:
emitted_replica_ids = set()
Expand All @@ -2076,20 +2099,13 @@ def _all_replicas(self) -> Iterable[JSON]:
emitted_replica_ids.add(replica_id)

def _join_replicas(self,
keys_page: Iterable[dict[str, str]]
keys_page: Iterable[ReplicaKeys]
) -> Iterable[Hit]:
terms_by_field = defaultdict(set)
for keys in keys_page:
for field, term in keys.items():
terms_by_field[field].add(term)

request = self.service.create_request(catalog=self.catalog,
entity_type='replica',
doc_type=DocumentType.replica)
request = request.query(Q('bool', should=[
{'terms': {f'{field}.keyword': list(terms)}}
for field, terms in terms_by_field.items()
]
))
request = request.query(ReplicaKeys.prepare_query(keys_page))
return request.scan()

def create_file(self) -> tuple[str, Optional[str]]:
Expand Down

0 comments on commit ea755a2

Please sign in to comment.