Skip to content

Commit

Permalink
fixup! Add JSONL-based verbatim manifest format (#6028)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Mar 15, 2024
1 parent d4841b1 commit 5cc5ae9
Showing 1 changed file with 30 additions and 19 deletions.
49 changes: 30 additions & 19 deletions src/azul/service/manifest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2011,6 +2011,28 @@ def qualify(qualifier, column_name, index=None):
bundle_tsv_writer.writerow(row)


@attrs.frozen(kw_only=True)
class ReplicaKeys:
"""
Most replicas contain a list of the replica's hubs. However, some entities
(e.g. projects) have too many hubs to track within the replica document.
Replicas of such entities are instead retrieved by their entity ID.
"""
hub_id: str
implicit_hub_entity_id: str

@classmethod
def prepare_query(cls, keys: Iterable[Self]) -> Q:
terms = {'hub_ids': set(), 'entity_id': set()}
for self in keys:
terms['hub_ids'].add(self.hub_id)
terms['entity_id'].add(self.implicit_hub_entity_id)
return Q('bool', should=[
{'terms': {f'{field}.keyword': list(values)}}
for field, values in terms.items()
])


class VerbatimManifestGenerator(FileBasedManifestGenerator):

@property
Expand Down Expand Up @@ -2043,17 +2065,13 @@ def included_fields(self) -> list[FieldPath]:
def implicit_hub_type(self) -> str:
return self.service.metadata_plugin(self.catalog).implicit_hub_type

def _replica_keys(self) -> Iterable[dict[str, str]]:
def _replica_keys(self) -> Iterable[ReplicaKeys]:
request = self._create_request()
for hit in request.scan():
yield {
# Most replicas track their hubs explicitly, however...
'hub_ids': hit['entity_id'],
# ... for projects and datasets, there are too many hubs to
# track them all in the replica, so they are instead retrieved
# by entity ID.
'entity_id': one(one(hit['contents'][self.implicit_hub_type])['document_id'])
}
yield ReplicaKeys(
hub_id=hit['entity_id'],
implicit_hub_entity_id=one(one(hit['contents'][self.implicit_hub_type])['document_id'])
)

def _all_replicas(self) -> Iterable[JSON]:
emitted_replica_ids = set()
Expand All @@ -2076,20 +2094,13 @@ def _all_replicas(self) -> Iterable[JSON]:
emitted_replica_ids.add(replica_id)

def _join_replicas(self,
keys_page: Iterable[dict[str, str]]
keys_page: Iterable[ReplicaKeys]
) -> Iterable[Hit]:
terms_by_field = defaultdict(set)
for keys in keys_page:
for field, term in keys.items():
terms_by_field[field].add(term)

request = self.service.create_request(catalog=self.catalog,
entity_type='replica',
doc_type=DocumentType.replica)
request = request.query(Q('bool', should=[
{'terms': {f'{field}.keyword': list(terms)}}
for field, terms in terms_by_field.items()
]
))
request = request.query(ReplicaKeys.prepare_query(keys_page))
return request.scan()

def create_file(self) -> tuple[str, Optional[str]]:
Expand Down

0 comments on commit 5cc5ae9

Please sign in to comment.