diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index 1dd20de30..cb5520087 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -2011,6 +2011,33 @@ def qualify(qualifier, column_name, index=None): bundle_tsv_writer.writerow(row) +@attrs.frozen(kw_only=True) +class ReplicaKeys: + """ + For most entity types, the replica document contains an explicit list of its + the entity's hubs. However, entities + hub cardinality (e.g. projects) + have too many hubs to track within their replicas. These entities are instead + + """ + hub_id: str + # ... for projects and datasets, there are too many hubs to + # track them all in the replica, so they are instead retrieved + # by entity ID. + implicit_hub_entity_id: str + + @classmethod + def prepare_query(cls, keys: Iterable[Self]) -> Q: + terms = {'hub_ids': set(), 'entity_id': set()} + for self in keys: + terms['hub_ids'].add(self.hub_id) + terms['entity_id'].add(self.implicit_hub_entity_id) + return Q('bool', should=[ + {'terms': {f'{field}.keyword': list(values)}} + for field, values in terms.items() + ]) + + class VerbatimManifestGenerator(FileBasedManifestGenerator): @property @@ -2043,17 +2070,13 @@ def included_fields(self) -> list[FieldPath]: def implicit_hub_type(self) -> str: return self.service.metadata_plugin(self.catalog).implicit_hub_type - def _replica_keys(self) -> Iterable[dict[str, str]]: + def _replica_keys(self) -> Iterable[ReplicaKeys]: request = self._create_request() for hit in request.scan(): - yield { - # Most replicas track their hubs explicitly, however... - 'hub_ids': hit['entity_id'], - # ... for projects and datasets, there are too many hubs to - # track them all in the replica, so they are instead retrieved - # by entity ID. - 'entity_id': one(one(hit['contents'][self.implicit_hub_type])['document_id']) - } + yield ReplicaKeys( + hub_id=hit['entity_id'], + implicit_hub_entity_id=one(one(hit['contents'][self.implicit_hub_type])['document_id']) + ) def _all_replicas(self) -> Iterable[JSON]: emitted_replica_ids = set() @@ -2076,20 +2099,13 @@ def _all_replicas(self) -> Iterable[JSON]: emitted_replica_ids.add(replica_id) def _join_replicas(self, - keys_page: Iterable[dict[str, str]] + keys_page: Iterable[ReplicaKeys] ) -> Iterable[Hit]: - terms_by_field = defaultdict(set) - for keys in keys_page: - for field, term in keys.items(): - terms_by_field[field].add(term) + request = self.service.create_request(catalog=self.catalog, entity_type='replica', doc_type=DocumentType.replica) - request = request.query(Q('bool', should=[ - {'terms': {f'{field}.keyword': list(terms)}} - for field, terms in terms_by_field.items() - ] - )) + request = request.query(ReplicaKeys.prepare_query(keys_page)) return request.scan() def create_file(self) -> tuple[str, Optional[str]]: