diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index 1dd20de30..5d7489ce2 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -2011,6 +2011,28 @@ def qualify(qualifier, column_name, index=None): bundle_tsv_writer.writerow(row) +@attrs.frozen(kw_only=True) +class ReplicaKeys: + """ + Most replicas contain a list of the replica's hubs. However, some entities + (e.g. projects) have too many hubs to track within the replica document. + Replicas of such entities are instead retrieved by their entity ID. + """ + hub_id: str + implicit_hub_entity_id: str + + @classmethod + def prepare_query(cls, keys: Iterable[Self]) -> Q: + terms = {'hub_ids': set(), 'entity_id': set()} + for self in keys: + terms['hub_ids'].add(self.hub_id) + terms['entity_id'].add(self.implicit_hub_entity_id) + return Q('bool', should=[ + {'terms': {f'{field}.keyword': list(values)}} + for field, values in terms.items() + ]) + + class VerbatimManifestGenerator(FileBasedManifestGenerator): @property @@ -2043,17 +2065,13 @@ def included_fields(self) -> list[FieldPath]: def implicit_hub_type(self) -> str: return self.service.metadata_plugin(self.catalog).implicit_hub_type - def _replica_keys(self) -> Iterable[dict[str, str]]: + def _replica_keys(self) -> Iterable[ReplicaKeys]: request = self._create_request() for hit in request.scan(): - yield { - # Most replicas track their hubs explicitly, however... - 'hub_ids': hit['entity_id'], - # ... for projects and datasets, there are too many hubs to - # track them all in the replica, so they are instead retrieved - # by entity ID. - 'entity_id': one(one(hit['contents'][self.implicit_hub_type])['document_id']) - } + yield ReplicaKeys( + hub_id=hit['entity_id'], + implicit_hub_entity_id=one(one(hit['contents'][self.implicit_hub_type])['document_id']) + ) def _all_replicas(self) -> Iterable[JSON]: emitted_replica_ids = set() @@ -2076,20 +2094,13 @@ def _all_replicas(self) -> Iterable[JSON]: emitted_replica_ids.add(replica_id) def _join_replicas(self, - keys_page: Iterable[dict[str, str]] + keys_page: Iterable[ReplicaKeys] ) -> Iterable[Hit]: - terms_by_field = defaultdict(set) - for keys in keys_page: - for field, term in keys.items(): - terms_by_field[field].add(term) + request = self.service.create_request(catalog=self.catalog, entity_type='replica', doc_type=DocumentType.replica) - request = request.query(Q('bool', should=[ - {'terms': {f'{field}.keyword': list(terms)}} - for field, terms in terms_by_field.items() - ] - )) + request = request.query(ReplicaKeys.prepare_query(keys_page)) return request.scan() def create_file(self) -> tuple[str, Optional[str]]: