From e5bc3563b6a7b6da17442ee2511d109bcc1a1992 Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Fri, 20 Sep 2024 16:41:05 -0700 Subject: [PATCH] Fix: No replicas for donors in HCA (#6582) --- src/azul/indexer/index_service.py | 11 ++- src/azul/plugins/metadata/anvil/__init__.py | 2 - .../metadata/anvil/indexer/transform.py | 71 +++++-------------- .../plugins/metadata/hca/indexer/transform.py | 43 +++++------ 4 files changed, 42 insertions(+), 85 deletions(-) diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index 1c93ae71c..ab9571afd 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -302,16 +302,21 @@ def transform(self, log.info('Transforming %i entities in partition %s of bundle %s, version %s.', num_entities, partition, bundle.uuid, bundle.version) contributions = [] - replicas = [] + replicas_by_coords = {} for transformer in transformers: for document in transformer.transform(partition): if isinstance(document, Contribution): contributions.append(document) elif isinstance(document, Replica): - replicas.append(document) + try: + dup = replicas_by_coords[document.coordinates] + except KeyError: + replicas_by_coords[document.coordinates] = document + else: + dup.hub_ids.extend(document.hub_ids) else: assert False, document - return contributions, replicas + return contributions, list(replicas_by_coords.values()) def create_indices(self, catalog: CatalogName): es_client = ESClientFactory.get() diff --git a/src/azul/plugins/metadata/anvil/__init__.py b/src/azul/plugins/metadata/anvil/__init__.py index 77fea54c3..0b88375ec 100644 --- a/src/azul/plugins/metadata/anvil/__init__.py +++ b/src/azul/plugins/metadata/anvil/__init__.py @@ -38,7 +38,6 @@ BiosampleTransformer, BundleTransformer, DatasetTransformer, - DiagnosisTransformer, DonorTransformer, FileTransformer, ) @@ -98,7 +97,6 @@ def transformer_types(self) -> Iterable[Type[BaseTransformer]]: BiosampleTransformer, BundleTransformer, DatasetTransformer, - DiagnosisTransformer, DonorTransformer, FileTransformer, ) diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py index f26af3699..be019346e 100644 --- a/src/azul/plugins/metadata/anvil/indexer/transform.py +++ b/src/azul/plugins/metadata/anvil/indexer/transform.py @@ -98,6 +98,12 @@ class LinkedEntities: def __getitem__(self, item: EntityType) -> set[EntityReference]: return self.ancestors[item] | self.descendants[item] + def __iter__(self) -> Iterable[EntityReference]: + for entities in self.ancestors.values(): + yield from entities + for entities in self.descendants.values(): + yield from entities + @classmethod def from_links(cls, origin: EntityReference, @@ -459,8 +465,8 @@ def _complete_dataset_keys(cls) -> AbstractSet[str]: class SingletonTransformer(BaseTransformer, metaclass=ABCMeta): - def _contents(self) -> MutableJSON: - return dict( + def _transform(self, entity: EntityReference) -> Iterable[Contribution]: + contents = dict( activities=self._entities(self._activity, chain.from_iterable( self._entities_by_type[activity_type] for activity_type in self._activity_polymorphic_types @@ -471,6 +477,7 @@ def _contents(self) -> MutableJSON: donors=self._entities(self._donor, self._entities_by_type['donor']), files=self._entities(self._file, self._entities_by_type['file']) ) + yield self._contribution(contents, entity) @classmethod def field_types(cls) -> FieldTypes: @@ -511,19 +518,15 @@ def entity_type(cls) -> str: def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]: linked = self._linked_entities(entity) - files = linked['file'] contents = dict( activities=[self._activity(entity)], biosamples=self._entities(self._biosample, linked['biosample']), datasets=[self._dataset(self._only_dataset())], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=self._entities(self._donor, linked['donor']), - files=self._entities(self._file, files), + files=self._entities(self._file, linked['file']) ) yield self._contribution(contents, entity) - if config.enable_replicas: - hub_ids = [f.entity_id for f in files] - yield self._replica(entity, hub_ids) class BiosampleTransformer(BaseTransformer): @@ -534,7 +537,6 @@ def entity_type(cls) -> str: def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]: linked = self._linked_entities(entity) - files = linked['file'] contents = dict( activities=self._entities(self._activity, chain.from_iterable( linked[activity_type] @@ -544,25 +546,9 @@ def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica datasets=[self._dataset(self._only_dataset())], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=self._entities(self._donor, linked['donor']), - files=self._entities(self._file, files), + files=self._entities(self._file, linked['file']), ) yield self._contribution(contents, entity) - if config.enable_replicas: - hub_ids = [f.entity_id for f in files] - yield self._replica(entity, hub_ids) - - -class DiagnosisTransformer(BaseTransformer): - - def _transform(self, entity: EntityReference) -> Iterable[Replica]: - if config.enable_replicas: - files = self._linked_entities(entity)['file'] - hub_ids = [f.entity_id for f in files] - yield self._replica(entity, hub_ids) - - @classmethod - def entity_type(cls) -> EntityType: - return 'diagnoses' class BundleTransformer(SingletonTransformer): @@ -575,10 +561,6 @@ def _singleton(self) -> EntityReference: return EntityReference(entity_type='bundle', entity_id=self.bundle.uuid) - def _transform(self, entity: EntityReference) -> Iterable[Contribution]: - contents = self._contents() - yield self._contribution(contents, entity) - class DatasetTransformer(SingletonTransformer): @@ -589,20 +571,6 @@ def entity_type(cls) -> str: def _singleton(self) -> EntityReference: return self._only_dataset() - def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]: - contents = self._contents() - yield self._contribution(contents, entity) - if config.enable_replicas: - # Every file in a snapshot is linked to that snapshot's singular - # dataset, making an explicit list of hub IDs for the dataset both - # redundant and impractically large (we observe that for large - # snapshots, trying to track this many files in a single data structure - # causes a prohibitively high rate of conflicts during replica updates). - # Therefore, we leave the hub IDs field empty for datasets and rely on - # the tenet that every file is an implicit hub of its parent dataset. - hub_ids = [] - return self._replica(entity, hub_ids) - class DonorTransformer(BaseTransformer): @@ -612,7 +580,6 @@ def entity_type(cls) -> str: def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]: linked = self._linked_entities(entity) - files = linked['file'] contents = dict( activities=self._entities(self._activity, chain.from_iterable( linked[activity_type] @@ -622,12 +589,9 @@ def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica datasets=[self._dataset(self._only_dataset())], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=[self._donor(entity)], - files=self._entities(self._file, files), + files=self._entities(self._file, linked['file']), ) yield self._contribution(contents, entity) - if config.enable_replicas: - hub_ids = [f.entity_id for f in files] - return self._replica(entity, hub_ids) class FileTransformer(BaseTransformer): @@ -651,8 +615,9 @@ def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica ) yield self._contribution(contents, entity) if config.enable_replicas: - # The result of the link traversal does not include the starting entity, - # so without this step the file itself wouldn't be included in its hubs - files = (entity, *linked['file']) - hub_ids = [f.entity_id for f in files] - return self._replica(entity, hub_ids) + hub_ids = [entity.entity_id] + # The result of the link traversal does not include the starting entity + yield self._replica(entity, hub_ids) + for linked_entity in linked: + yield self._replica(linked_entity, + hub_ids=[] if linked_entity.entity_type == 'dataset' else hub_ids) diff --git a/src/azul/plugins/metadata/hca/indexer/transform.py b/src/azul/plugins/metadata/hca/indexer/transform.py index 79be27b35..c73d7ca89 100644 --- a/src/azul/plugins/metadata/hca/indexer/transform.py +++ b/src/azul/plugins/metadata/hca/indexer/transform.py @@ -1383,6 +1383,13 @@ def visit(self, entity: api.Entity) -> None: if not is_zarr or sub_name.endswith('.zattrs'): self.files[entity.document_id] = entity + @property + def entities(self) -> Iterable[EntityReference]: + for entity_dict in vars(self).values(): + for entity in entity_dict.values(): + yield EntityReference(entity_type=entity.schema_name, + entity_id=str(entity.document_id)) + ENTITY = TypeVar('ENTITY', bound=api.Entity) @@ -1464,8 +1471,14 @@ def _transform(self, files: Iterable[api.File]) -> Iterable[Contribution | Repli ref = self._entity_ref(file) yield self._contribution(contents, ref) if config.enable_replicas: - hub_ids = list(map(str, visitor.files)) + hub_ids = [ref.entity_id] yield self._replica(ref, hub_ids) + yield self._replica(EntityReference(entity_type='bundles', + entity_id=self.bundle.uuid), + hub_ids) + for linked_entity in visitor.entities: + yield self._replica(linked_entity, + hub_ids=[] if linked_entity.entity_type == 'project' else hub_ids) def matrix_stratification_values(self, file: api.File) -> JSON: """ @@ -1562,9 +1575,6 @@ def _transform(self, projects=[self._project(self._api_project)]) ref = self._entity_ref(cell_suspension) yield self._contribution(contents, ref) - if config.enable_replicas: - hub_ids = list(map(str, visitor.files)) - yield self._replica(ref, hub_ids) class SampleTransformer(PartitionedTransformer): @@ -1611,9 +1621,6 @@ def _transform(self, samples: Iterable[Sample]) -> Iterable[Contribution | Repli projects=[self._project(self._api_project)]) ref = self._entity_ref(sample) yield self._contribution(contents, ref) - if config.enable_replicas: - hub_ids = list(map(str, visitor.files)) - yield self._replica(ref, hub_ids) class BundleAsEntity(DatedEntity): @@ -1649,11 +1656,11 @@ def _dated_entities(self) -> Iterable[DatedEntity]: def estimate(self, partition: BundlePartition) -> int: return int(partition.contains(self._singleton_id)) - def transform(self, partition: BundlePartition) -> Iterable[Contribution | Replica]: + def transform(self, partition: BundlePartition) -> Iterable[Contribution]: if partition.contains(self._singleton_id): yield from self._transform() - def _transform(self) -> Iterable[Contribution | Replica]: + def _transform(self) -> Contribution: # Project entities are not explicitly linked in the graph. The mere # presence of project metadata in a bundle indicates that all other # entities in that bundle belong to that project. Because of that we @@ -1713,13 +1720,6 @@ def _transform(self) -> Iterable[Contribution | Replica]: projects=[self._project(self._api_project)]) ref = self._entity_ref(self._singleton_entity()) yield self._contribution(contents, ref) - if config.enable_replicas: - hub_ids = self._hub_ids(visitor) - return self._replica(ref, hub_ids) - - @abstractmethod - def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: - raise NotImplementedError class ProjectTransformer(SingletonTransformer): @@ -1731,14 +1731,6 @@ def _singleton_entity(self) -> DatedEntity: def entity_type(cls) -> str: return 'projects' - def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: - # Every file in a snapshot is linked to that snapshot's singular - # project, making an explicit list of hub IDs for the project both - # redundant and impractically large. Therefore, we leave the hub IDs - # field empty for projects and rely on the tenet that every file is an - # implicit hub of its parent project. - return [] - class BundleTransformer(SingletonTransformer): @@ -1755,6 +1747,3 @@ def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]: @classmethod def entity_type(cls) -> str: return 'bundles' - - def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: - return list(map(str, visitor.files))