diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index 8fc5140e6..1c93ae71c 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -38,7 +38,6 @@ from more_itertools import ( first, one, - unzip, ) from azul import ( @@ -305,16 +304,13 @@ def transform(self, contributions = [] replicas = [] for transformer in transformers: - # The cast is necessary because unzip()'s type stub doesn't - # support heterogeneous tuples. - transforms = cast( - tuple[Iterable[Optional[Contribution]], Iterable[Optional[Replica]]], - unzip(transformer.transform(partition)) - ) - if transforms: - contributions_part, replicas_part = transforms - contributions.extend(filter(None, contributions_part)) - replicas.extend(filter(None, replicas_part)) + for document in transformer.transform(partition): + if isinstance(document, Contribution): + contributions.append(document) + elif isinstance(document, Replica): + replicas.append(document) + else: + assert False, document return contributions, replicas def create_indices(self, catalog: CatalogName): diff --git a/src/azul/indexer/transform.py b/src/azul/indexer/transform.py index a5e6a6975..5520ceaa8 100644 --- a/src/azul/indexer/transform.py +++ b/src/azul/indexer/transform.py @@ -36,8 +36,6 @@ JSON, ) -Transform = tuple[Optional[Contribution], Optional[Replica]] - @attr.s(frozen=True, kw_only=True, auto_attribs=True) class Transformer(metaclass=ABCMeta): @@ -54,12 +52,12 @@ def entity_type(cls) -> EntityType: raise NotImplementedError @abstractmethod - def replica_type(self, entity: EntityReference) -> str: + def _replicate(self, entity: EntityReference) -> tuple[str, JSON]: """ - The name of the type of replica emitted by this transformer for a given - entity. - - See :py:attr:`Replica.replica_type` + A tuple consisting of: + 1. The name of the type of replica emitted by this transformer for a + given entity. See :py:attr:`Replica.replica_type`. + 2. The contents of the replica for that entity. """ raise NotImplementedError @@ -87,7 +85,7 @@ def estimate(self, partition: BundlePartition) -> int: """ @abstractmethod - def transform(self, partition: BundlePartition) -> Iterable[Transform]: + def transform(self, partition: BundlePartition) -> Iterable[Contribution | Replica]: """ Return the contributions by the current bundle to the entities it contains metadata about. More than one bundle can contribute to a @@ -125,15 +123,15 @@ def _contribution(self, contents=contents) def _replica(self, - contents: JSON, entity: EntityReference, hub_ids: list[EntityID] ) -> Replica: + replica_type, contents = self._replicate(entity) coordinates = ReplicaCoordinates(content_hash=json_hash(contents).hexdigest(), entity=entity) return Replica(coordinates=coordinates, version=None, - replica_type=self.replica_type(entity), + replica_type=replica_type, contents=contents, hub_ids=hub_ids) diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py index 1e2eeef38..f26af3699 100644 --- a/src/azul/plugins/metadata/anvil/indexer/transform.py +++ b/src/azul/plugins/metadata/anvil/indexer/transform.py @@ -54,6 +54,7 @@ EntityReference, EntityType, FieldTypes, + Replica, null_bool, null_int, null_str, @@ -61,7 +62,6 @@ pass_thru_json, ) from azul.indexer.transform import ( - Transform, Transformer, ) from azul.plugins.metadata.anvil.bundle import ( @@ -134,9 +134,6 @@ def _search(cls, class BaseTransformer(Transformer, metaclass=ABCMeta): bundle: AnvilBundle - def replica_type(self, entity: EntityReference) -> str: - return f'anvil_{entity.entity_type}' - @classmethod def field_types(cls) -> FieldTypes: return { @@ -168,32 +165,20 @@ def aggregator(cls, entity_type) -> EntityAggregator: def estimate(self, partition: BundlePartition) -> int: return sum(map(partial(self._contains, partition), self.bundle.entities)) - def transform(self, partition: BundlePartition) -> Iterable[Transform]: - return ( - self._transform(entity) - for entity in self._list_entities() - if self._contains(partition, entity) - ) + def transform(self, partition: BundlePartition) -> Iterable[Contribution | Replica]: + for entity in self._list_entities(): + if self._contains(partition, entity): + yield from self._transform(entity) def _list_entities(self) -> Iterable[EntityReference]: return self.bundle.entities @abstractmethod - def _transform(self, entity: EntityReference) -> Transform: + def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]: raise NotImplementedError - def _add_replica(self, - contribution: JSON | None, - entity: EntityReference, - hub_ids: list[EntityID] - ) -> Transform: - no_replica = not config.enable_replicas or self.entity_type() == 'bundles' - return ( - None if contribution is None else self._contribution(contribution, entity), - None if no_replica else self._replica(self.bundle.entities[entity], - entity, - hub_ids) - ) + def _replicate(self, entity: EntityReference) -> tuple[str, JSON]: + return f'anvil_{entity.entity_type}', self.bundle.entities[entity] def _pluralize(self, entity_type: str) -> str: if entity_type == 'diagnosis': @@ -524,7 +509,7 @@ class ActivityTransformer(BaseTransformer): def entity_type(cls) -> str: return 'activities' - def _transform(self, entity: EntityReference) -> Transform: + def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]: linked = self._linked_entities(entity) files = linked['file'] contents = dict( @@ -535,8 +520,10 @@ def _transform(self, entity: EntityReference) -> Transform: donors=self._entities(self._donor, linked['donor']), files=self._entities(self._file, files), ) - hub_ids = [f.entity_id for f in files] - return self._add_replica(contents, entity, hub_ids) + yield self._contribution(contents, entity) + if config.enable_replicas: + hub_ids = [f.entity_id for f in files] + yield self._replica(entity, hub_ids) class BiosampleTransformer(BaseTransformer): @@ -545,7 +532,7 @@ class BiosampleTransformer(BaseTransformer): def entity_type(cls) -> str: return 'biosamples' - def _transform(self, entity: EntityReference) -> Transform: + def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]: linked = self._linked_entities(entity) files = linked['file'] contents = dict( @@ -559,16 +546,19 @@ def _transform(self, entity: EntityReference) -> Transform: donors=self._entities(self._donor, linked['donor']), files=self._entities(self._file, files), ) - hub_ids = [f.entity_id for f in files] - return self._add_replica(contents, entity, hub_ids) + yield self._contribution(contents, entity) + if config.enable_replicas: + hub_ids = [f.entity_id for f in files] + yield self._replica(entity, hub_ids) class DiagnosisTransformer(BaseTransformer): - def _transform(self, entity: EntityReference) -> Transform: - files = self._linked_entities(entity)['file'] - hub_ids = [f.entity_id for f in files] - return self._add_replica(None, entity, hub_ids) + def _transform(self, entity: EntityReference) -> Iterable[Replica]: + if config.enable_replicas: + files = self._linked_entities(entity)['file'] + hub_ids = [f.entity_id for f in files] + yield self._replica(entity, hub_ids) @classmethod def entity_type(cls) -> EntityType: @@ -585,10 +575,9 @@ def _singleton(self) -> EntityReference: return EntityReference(entity_type='bundle', entity_id=self.bundle.uuid) - def _transform(self, entity: EntityReference) -> Transform: + def _transform(self, entity: EntityReference) -> Iterable[Contribution]: contents = self._contents() - hub_ids = [f.entity_id for f in self._entities_by_type['file']] - return self._add_replica(contents, entity, hub_ids) + yield self._contribution(contents, entity) class DatasetTransformer(SingletonTransformer): @@ -600,17 +589,19 @@ def entity_type(cls) -> str: def _singleton(self) -> EntityReference: return self._only_dataset() - def _transform(self, entity: EntityReference) -> Transform: + def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]: contents = self._contents() - # Every file in a snapshot is linked to that snapshot's singular - # dataset, making an explicit list of hub IDs for the dataset both - # redundant and impractically large (we observe that for large - # snapshots, trying to track this many files in a single data structure - # causes a prohibitively high rate of conflicts during replica updates). - # Therefore, we leave the hub IDs field empty for datasets and rely on - # the tenet that every file is an implicit hub of its parent dataset. - hub_ids = [] - return self._add_replica(contents, entity, hub_ids) + yield self._contribution(contents, entity) + if config.enable_replicas: + # Every file in a snapshot is linked to that snapshot's singular + # dataset, making an explicit list of hub IDs for the dataset both + # redundant and impractically large (we observe that for large + # snapshots, trying to track this many files in a single data structure + # causes a prohibitively high rate of conflicts during replica updates). + # Therefore, we leave the hub IDs field empty for datasets and rely on + # the tenet that every file is an implicit hub of its parent dataset. + hub_ids = [] + return self._replica(entity, hub_ids) class DonorTransformer(BaseTransformer): @@ -619,7 +610,7 @@ class DonorTransformer(BaseTransformer): def entity_type(cls) -> str: return 'donors' - def _transform(self, entity: EntityReference) -> Transform: + def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]: linked = self._linked_entities(entity) files = linked['file'] contents = dict( @@ -633,8 +624,10 @@ def _transform(self, entity: EntityReference) -> Transform: donors=[self._donor(entity)], files=self._entities(self._file, files), ) - hub_ids = [f.entity_id for f in files] - return self._add_replica(contents, entity, hub_ids) + yield self._contribution(contents, entity) + if config.enable_replicas: + hub_ids = [f.entity_id for f in files] + return self._replica(entity, hub_ids) class FileTransformer(BaseTransformer): @@ -643,7 +636,7 @@ class FileTransformer(BaseTransformer): def entity_type(cls) -> str: return 'files' - def _transform(self, entity: EntityReference) -> Transform: + def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]: linked = self._linked_entities(entity) contents = dict( activities=self._entities(self._activity, chain.from_iterable( @@ -656,8 +649,10 @@ def _transform(self, entity: EntityReference) -> Transform: donors=self._entities(self._donor, linked['donor']), files=[self._file(entity)], ) - # The result of the link traversal does not include the starting entity, - # so without this step the file itself wouldn't be included in its hubs - files = (entity, *linked['file']) - hub_ids = [f.entity_id for f in files] - return self._add_replica(contents, entity, hub_ids) + yield self._contribution(contents, entity) + if config.enable_replicas: + # The result of the link traversal does not include the starting entity, + # so without this step the file itself wouldn't be included in its hubs + files = (entity, *linked['file']) + hub_ids = [f.entity_id for f in files] + return self._replica(entity, hub_ids) diff --git a/src/azul/plugins/metadata/hca/indexer/transform.py b/src/azul/plugins/metadata/hca/indexer/transform.py index 1543eadf3..79be27b35 100644 --- a/src/azul/plugins/metadata/hca/indexer/transform.py +++ b/src/azul/plugins/metadata/hca/indexer/transform.py @@ -73,6 +73,7 @@ Nested, NullableString, PassThrough, + Replica, null_bool, null_datetime, null_int, @@ -82,7 +83,6 @@ pass_thru_json, ) from azul.indexer.transform import ( - Transform, Transformer, ) from azul.iterators import ( @@ -461,10 +461,6 @@ class BaseTransformer(Transformer, metaclass=ABCMeta): bundle: HCABundle api_bundle: api.Bundle - def replica_type(self, entity: EntityReference) -> str: - api_entity = self.api_bundle.entities[UUID(entity.entity_id)] - return api_entity.schema_name - @classmethod def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]: if entity_type == 'files': @@ -501,22 +497,16 @@ def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]: else: return SimpleAggregator() - def _add_replica(self, - contribution: JSON, - entity: Union[api.Entity, DatedEntity], - hub_ids: list[EntityID] - ) -> Transform: - entity_ref = EntityReference(entity_id=str(entity.document_id), - entity_type=self.entity_type()) - if not config.enable_replicas: - replica = None - elif self.entity_type() == 'bundles': - links = self.bundle.links - replica = self._replica(links, entity_ref, hub_ids) + def _replicate(self, entity: EntityReference) -> tuple[str, JSON]: + if entity.entity_type == 'bundles': + return 'links', self.bundle.links else: - assert isinstance(entity, api.Entity), entity - replica = self._replica(entity.json, entity_ref, hub_ids) - return self._contribution(contribution, entity_ref), replica + api_entity = self.api_bundle.entities[UUID(entity.entity_id)] + return api_entity.schema_name, api_entity.json + + def _entity_ref(self, entity: Entity) -> EntityReference: + return EntityReference(entity_type=self.entity_type(), + entity_id=str(entity.document_id)) def _find_ancestor_samples(self, entity: api.LinkedEntity, @@ -1400,7 +1390,7 @@ def visit(self, entity: api.Entity) -> None: class PartitionedTransformer(BaseTransformer, Generic[ENTITY]): @abstractmethod - def _transform(self, entities: Iterable[ENTITY]) -> Iterable[Transform]: + def _transform(self, entities: Iterable[ENTITY]) -> Iterable[Contribution | Replica]: """ Transform the given outer entities into contributions. """ @@ -1419,7 +1409,7 @@ def _entities_in(self, partition: BundlePartition) -> Iterator[ENTITY]: def estimate(self, partition: BundlePartition) -> int: return ilen(self._entities_in(partition)) - def transform(self, partition: BundlePartition) -> Iterable[Transform]: + def transform(self, partition: BundlePartition) -> Iterable[Contribution | Replica]: return self._transform(generable(self._entities_in, partition)) @@ -1432,7 +1422,7 @@ def entity_type(cls) -> str: def _entities(self) -> Iterable[api.File]: return self.api_bundle.not_stitched(self.api_bundle.files) - def _transform(self, files: Iterable[api.File]) -> Iterable[Contribution]: + def _transform(self, files: Iterable[api.File]) -> Iterable[Contribution | Replica]: zarr_stores: Mapping[str, list[api.File]] = self.group_zarrs(files) for file in files: file_name = file.manifest_entry.name @@ -1471,8 +1461,11 @@ def _transform(self, files: Iterable[api.File]) -> Iterable[Contribution]: additional_contents = self.matrix_stratification_values(file) for entity_type, values in additional_contents.items(): contents[entity_type].extend(values) - hub_ids = list(map(str, visitor.files)) - yield self._add_replica(contents, file, hub_ids) + ref = self._entity_ref(file) + yield self._contribution(contents, ref) + if config.enable_replicas: + hub_ids = list(map(str, visitor.files)) + yield self._replica(ref, hub_ids) def matrix_stratification_values(self, file: api.File) -> JSON: """ @@ -1544,7 +1537,7 @@ def _entities(self) -> Iterable[api.CellSuspension]: def _transform(self, cell_suspensions: Iterable[api.CellSuspension] - ) -> Iterable[Contribution]: + ) -> Iterable[Contribution | Replica]: for cell_suspension in cell_suspensions: samples: dict[str, Sample] = dict() self._find_ancestor_samples(cell_suspension, samples) @@ -1567,8 +1560,11 @@ def _transform(self, ), dates=[self._date(cell_suspension)], projects=[self._project(self._api_project)]) - hub_ids = list(map(str, visitor.files)) - yield self._add_replica(contents, cell_suspension, hub_ids) + ref = self._entity_ref(cell_suspension) + yield self._contribution(contents, ref) + if config.enable_replicas: + hub_ids = list(map(str, visitor.files)) + yield self._replica(ref, hub_ids) class SampleTransformer(PartitionedTransformer): @@ -1592,7 +1588,7 @@ def _entities(self) -> Iterable[Sample]: self._find_ancestor_samples(file, samples) return samples.values() - def _transform(self, samples: Iterable[Sample]) -> Iterable[Contribution]: + def _transform(self, samples: Iterable[Sample]) -> Iterable[Contribution | Replica]: for sample in samples: visitor = TransformerVisitor() sample.accept(visitor) @@ -1613,8 +1609,11 @@ def _transform(self, samples: Iterable[Sample]) -> Iterable[Contribution]: ), dates=[self._date(sample)], projects=[self._project(self._api_project)]) - hub_ids = list(map(str, visitor.files)) - yield self._add_replica(contents, sample, hub_ids) + ref = self._entity_ref(sample) + yield self._contribution(contents, ref) + if config.enable_replicas: + hub_ids = list(map(str, visitor.files)) + yield self._replica(ref, hub_ids) class BundleAsEntity(DatedEntity): @@ -1650,11 +1649,11 @@ def _dated_entities(self) -> Iterable[DatedEntity]: def estimate(self, partition: BundlePartition) -> int: return int(partition.contains(self._singleton_id)) - def transform(self, partition: BundlePartition) -> Iterable[Transform]: + def transform(self, partition: BundlePartition) -> Iterable[Contribution | Replica]: if partition.contains(self._singleton_id): - yield self._transform() + yield from self._transform() - def _transform(self) -> Transform: + def _transform(self) -> Iterable[Contribution | Replica]: # Project entities are not explicitly linked in the graph. The mere # presence of project metadata in a bundle indicates that all other # entities in that bundle belong to that project. Because of that we @@ -1712,8 +1711,11 @@ def _transform(self) -> Transform: contributed_analyses=contributed_analyses, dates=[self._date(self._singleton_entity())], projects=[self._project(self._api_project)]) - hub_ids = self._hub_ids(visitor) - return self._add_replica(contents, self._singleton_entity(), hub_ids) + ref = self._entity_ref(self._singleton_entity()) + yield self._contribution(contents, ref) + if config.enable_replicas: + hub_ids = self._hub_ids(visitor) + return self._replica(ref, hub_ids) @abstractmethod def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: @@ -1743,10 +1745,6 @@ class BundleTransformer(SingletonTransformer): def _singleton_entity(self) -> DatedEntity: return BundleAsEntity(self.api_bundle) - def replica_type(self, entity: EntityReference) -> str: - assert entity.entity_type == self.entity_type(), entity - return 'links' - @classmethod def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]: if entity_type == 'files':