From d21f842d3df8c19ed650388f786637b35c96174a Mon Sep 17 00:00:00 2001 From: Noah Dove Date: Sat, 15 Apr 2023 20:05:23 -0700 Subject: [PATCH] Push down HCA specifics to a Bundle subclass (#4940) --- src/azul/indexer/__init__.py | 22 +- src/azul/plugins/anvil.py | 89 ++++++++ src/azul/plugins/hca.py | 41 ++++ src/azul/plugins/metadata/anvil/__init__.py | 10 +- .../metadata/anvil/indexer/transform.py | 185 ++++++++-------- src/azul/plugins/metadata/hca/__init__.py | 10 +- .../plugins/metadata/hca/indexer/transform.py | 8 +- .../plugins/repository/canned/__init__.py | 6 +- src/azul/plugins/repository/dss/__init__.py | 6 +- src/azul/plugins/repository/tdr.py | 3 +- .../plugins/repository/tdr_anvil/__init__.py | 198 +++++------------- .../plugins/repository/tdr_hca/__init__.py | 5 +- 12 files changed, 304 insertions(+), 279 deletions(-) create mode 100644 src/azul/plugins/anvil.py create mode 100644 src/azul/plugins/hca.py diff --git a/src/azul/indexer/__init__.py b/src/azul/indexer/__init__.py index 12b9db534..1d7edd1f5 100644 --- a/src/azul/indexer/__init__.py +++ b/src/azul/indexer/__init__.py @@ -30,7 +30,6 @@ from azul.types import ( JSON, MutableJSON, - MutableJSONs, SupportsLessThan, get_generic_type_params, ) @@ -396,23 +395,6 @@ def to_json(self) -> SourcedBundleFQIDJSON: @attr.s(auto_attribs=True, kw_only=True) class Bundle(ABC, Generic[BUNDLE_FQID]): fqid: BUNDLE_FQID - manifest: MutableJSONs - """ - Each item of the `manifest` attribute's value has this shape: - { - 'content-type': 'application/json; dcp-type="metadata/biomaterial"', - 'crc32c': 'fd239631', - 'indexed': True, - 'name': 'cell_suspension_0.json', - 's3_etag': 'aa31c093cc816edb1f3a42e577872ec6', - 'sha1': 'f413a9a7923dee616309e4f40752859195798a5d', - 'sha256': 'ea4c9ed9e53a3aa2ca4b7dffcacb6bbe9108a460e8e15d2b3d5e8e5261fb043e', - 'size': 1366, - 'uuid': '0136ebb4-1317-42a0-8826-502fae25c29f', - 'version': '2019-05-16T162155.020000Z' - } - """ - metadata_files: MutableJSON @property def uuid(self) -> BundleUUID: @@ -432,6 +414,10 @@ def drs_path(self, manifest_entry: JSON) -> Optional[str]: """ raise NotImplementedError + @abstractmethod + def to_json(self) -> MutableJSON: + raise NotImplementedError + class BundlePartition(UUIDPartition['BundlePartition']): """ diff --git a/src/azul/plugins/anvil.py b/src/azul/plugins/anvil.py new file mode 100644 index 000000000..45666c18b --- /dev/null +++ b/src/azul/plugins/anvil.py @@ -0,0 +1,89 @@ +from abc import ( + ABC, +) +from typing import ( + AbstractSet, + Generic, + Iterable, + Optional, + TypeVar, + Union, +) + +import attr +from more_itertools import ( + one, +) + +from azul.indexer import ( + BUNDLE_FQID, + Bundle, +) +from azul.indexer.document import ( + EntityReference, + EntityType, +) +from azul.types import ( + MutableJSON, +) + +# AnVIL snapshots do not use UUIDs for primary/foreign keys. +# This type alias helps us distinguish these keys from the document UUIDs, +# which are drawn from the `datarepo_row_id` column. +# Note that entities from different tables may have the same key, so +# `KeyReference` should be used when mixing keys from different entity types. +Key = str + + +@attr.s(frozen=True, auto_attribs=True, kw_only=True, slots=True) +class KeyReference: + key: Key + entity_type: EntityType + + +ENTITY_REF = TypeVar(name='ENTITY_REF', bound=Union[EntityReference, KeyReference]) + + +@attr.s(auto_attribs=True, frozen=True, kw_only=True, order=False) +class Link(Generic[ENTITY_REF]): + inputs: AbstractSet[ENTITY_REF] = attr.ib(factory=frozenset) + activity: Optional[ENTITY_REF] = attr.ib(default=None) + outputs: AbstractSet[ENTITY_REF] = attr.ib(factory=frozenset) + + @property + def all_entities(self) -> AbstractSet[ENTITY_REF]: + return self.inputs | self.outputs | (set() if self.activity is None else {self.activity}) + + def to_json(self) -> MutableJSON: + return { + 'inputs': sorted(map(str, self.inputs)), + 'activity': str(self.activity), + 'outputs': sorted(map(str, self.outputs)) + } + + @classmethod + def merge(cls, links: Iterable['Link']) -> 'Link': + return cls(inputs=frozenset.union(*[link.inputs for link in links]), + activity=one({link.activity for link in links}), + outputs=frozenset.union(*[link.outputs for link in links])) + + def __lt__(self, other: 'Link') -> bool: + if self.activity is None or other.activity is None: + return False + else: + return self.activity < other.activity + + +@attr.s(auto_attribs=True, kw_only=True) +class AnvilBundle(Bundle[BUNDLE_FQID], ABC): + entities: dict[EntityReference, MutableJSON] = attr.ib(factory=dict) + links: set[Link[EntityReference]] = attr.ib(factory=set) + + def to_json(self) -> MutableJSON: + return { + 'entities': { + str(entity_ref): entity + for entity_ref, entity in sorted(self.entities.items()) + }, + 'links': [link.to_json() for link in sorted(self.links)] + } diff --git a/src/azul/plugins/hca.py b/src/azul/plugins/hca.py new file mode 100644 index 000000000..c934dca55 --- /dev/null +++ b/src/azul/plugins/hca.py @@ -0,0 +1,41 @@ +from abc import ( + ABC, +) + +import attr + +from azul.indexer import ( + BUNDLE_FQID, + Bundle, +) +from azul.types import ( + MutableJSON, + MutableJSONs, +) + + +@attr.s(auto_attribs=True, kw_only=True) +class HCABundle(Bundle[BUNDLE_FQID], ABC): + manifest: MutableJSONs + """ + Each item of the `manifest` attribute's value has this shape: + { + 'content-type': 'application/json; dcp-type="metadata/biomaterial"', + 'crc32c': 'fd239631', + 'indexed': True, + 'name': 'cell_suspension_0.json', + 's3_etag': 'aa31c093cc816edb1f3a42e577872ec6', + 'sha1': 'f413a9a7923dee616309e4f40752859195798a5d', + 'sha256': 'ea4c9ed9e53a3aa2ca4b7dffcacb6bbe9108a460e8e15d2b3d5e8e5261fb043e', + 'size': 1366, + 'uuid': '0136ebb4-1317-42a0-8826-502fae25c29f', + 'version': '2019-05-16T162155.020000Z' + } + """ + metadata_files: MutableJSON + + def to_json(self) -> MutableJSON: + return { + 'manifest': self.manifest, + 'metadata': self.metadata_files + } diff --git a/src/azul/plugins/metadata/anvil/__init__.py b/src/azul/plugins/metadata/anvil/__init__.py index 53ee9f391..498f8dc02 100644 --- a/src/azul/plugins/metadata/anvil/__init__.py +++ b/src/azul/plugins/metadata/anvil/__init__.py @@ -6,15 +6,15 @@ Type, ) -from azul.indexer import ( - Bundle, -) from azul.plugins import ( DocumentSlice, ManifestConfig, MetadataPlugin, Sorting, ) +from azul.plugins.anvil import ( + AnvilBundle, +) from azul.plugins.metadata.anvil.indexer.transform import ( ActivityTransformer, BaseTransformer, @@ -42,7 +42,7 @@ ) -class Plugin(MetadataPlugin[Bundle]): +class Plugin(MetadataPlugin[AnvilBundle]): @classmethod def atlas(cls) -> str: @@ -71,7 +71,7 @@ def transformer_types(self) -> Iterable[Type[BaseTransformer]]: FileTransformer, ) - def transformers(self, bundle: Bundle, *, delete: bool) -> Iterable[BaseTransformer]: + def transformers(self, bundle: AnvilBundle, *, delete: bool) -> Iterable[BaseTransformer]: return [ transformer_cls(bundle=bundle, deleted=delete) for transformer_cls in self.transformer_types() diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py index 41fb9f28d..173e8ae01 100644 --- a/src/azul/plugins/metadata/anvil/indexer/transform.py +++ b/src/azul/plugins/metadata/anvil/indexer/transform.py @@ -15,8 +15,8 @@ ) from typing import ( Callable, + Collection, Iterable, - Mapping, Optional, ) from uuid import ( @@ -29,7 +29,6 @@ ) from azul.indexer import ( - Bundle, BundlePartition, ) from azul.indexer.aggregate import ( @@ -38,7 +37,6 @@ from azul.indexer.document import ( Contribution, ContributionCoordinates, - EntityID, EntityReference, EntityType, FieldTypes, @@ -52,6 +50,10 @@ from azul.indexer.transform import ( Transformer, ) +from azul.plugins.anvil import ( + AnvilBundle, + Link, +) from azul.plugins.metadata.anvil.indexer.aggregate import ( ActivityAggregator, BiosampleAggregator, @@ -60,31 +62,31 @@ DonorAggregator, FileAggregator, ) -from azul.plugins.repository.tdr_hca import ( - EntitiesByType, -) from azul.strings import ( pluralize, ) from azul.types import ( - JSON, - JSONs, MutableJSON, MutableJSONs, ) +EntityRefsByType = dict[EntityType, set[EntityReference]] + @attr.s(auto_attribs=True, kw_only=True, frozen=True) class LinkedEntities: origin: EntityReference - ancestors: EntitiesByType - descendants: EntitiesByType + ancestors: EntityRefsByType + descendants: EntityRefsByType - def __getitem__(self, item: EntityType) -> set[EntityID]: + def __getitem__(self, item: EntityType) -> set[EntityReference]: return self.ancestors[item] | self.descendants[item] @classmethod - def from_links(cls, origin: EntityReference, links: JSONs) -> 'LinkedEntities': + def from_links(cls, + origin: EntityReference, + links: Collection[Link[EntityReference]] + ) -> 'LinkedEntities': return cls(origin=origin, ancestors=cls._search(origin, links, from_='outputs', to='inputs'), descendants=cls._search(origin, links, from_='inputs', to='outputs')) @@ -92,30 +94,30 @@ def from_links(cls, origin: EntityReference, links: JSONs) -> 'LinkedEntities': @classmethod def _search(cls, entity_ref: EntityReference, - links: JSONs, - entities: Optional[EntitiesByType] = None, + links: Collection[Link[EntityReference]], + entities: Optional[EntityRefsByType] = None, *, from_: str, to: str - ) -> EntitiesByType: + ) -> EntityRefsByType: entities = defaultdict(set) if entities is None else entities if entity_ref.entity_type.endswith('activity'): - follow = [one(link for link in links if str(entity_ref) == link['activity'])] + follow = [one(link for link in links if str(entity_ref) == link.activity)] else: - follow = [link for link in links if str(entity_ref) in link[from_]] + follow = [link for link in links if str(entity_ref) in getattr(link, from_)] for link in follow: - for relative in [link['activity'], *link[to]]: + for relative in [link.activity, *getattr(link, to)]: if relative is not None: relative = EntityReference.parse(relative) if relative != entity_ref and relative.entity_id not in entities[relative.entity_type]: - entities[relative.entity_type].add(relative.entity_id) + entities[relative.entity_type].add(relative) cls._search(relative, links, entities, from_=from_, to=to) return entities @attr.s(frozen=True, kw_only=True, auto_attribs=True) class BaseTransformer(Transformer, ABC): - bundle: Bundle + bundle: AnvilBundle deleted: bool @classmethod @@ -147,14 +149,17 @@ def get_aggregator(cls, entity_type) -> EntityAggregator: assert False, entity_type def estimate(self, partition: BundlePartition) -> int: - return sum(map(partial(self._contains, partition), self.bundle.manifest)) + return sum(map(partial(self._contains, partition), self.bundle.entities)) def transform(self, partition: BundlePartition) -> Iterable[Contribution]: - return map(self._transform, - filter(partial(self._contains, partition), self.bundle.manifest)) + return ( + self._transform(entity) + for entity in self.bundle.entities + if self._contains(partition, entity) + ) @abstractmethod - def _transform(self, manifest_entry: JSON) -> Contribution: + def _transform(self, entity: EntityReference) -> Contribution: raise NotImplementedError def _pluralize(self, entity_type: str) -> str: @@ -163,34 +168,21 @@ def _pluralize(self, entity_type: str) -> str: else: return pluralize(entity_type) - def _contains(self, partition: BundlePartition, manifest_entry: JSON) -> bool: + def _contains(self, partition: BundlePartition, entity: EntityReference) -> bool: return ( - self._pluralize(self._entity_type(manifest_entry)).endswith(self.entity_type()) - and partition.contains(UUID(manifest_entry['uuid'])) + entity.entity_type == self.entity_type() + and partition.contains(UUID(entity.entity_id)) ) - def _entity_type(self, manifest_entry: JSON) -> EntityType: - return manifest_entry['name'].split('_')[0] - @cached_property - def _entries_by_entity_id(self) -> Mapping[EntityID, JSON]: - return { - manifest_entry['uuid']: manifest_entry - for manifest_entry in self.bundle.manifest - } - - @cached_property - def _entities_by_type(self) -> EntitiesByType: + def _entities_by_type(self) -> dict[EntityType, set[EntityReference]]: entries = defaultdict(set) - for e in self.bundle.manifest: - entries[self._entity_type(e)].add(e['uuid']) + for e in self.bundle.entities: + entries[e.entity_type].add(e) return entries - def _linked_entities(self, manifest_entry: JSON) -> LinkedEntities: - entity_ref = EntityReference(entity_type=self._entity_type(manifest_entry), - entity_id=manifest_entry['uuid']) - links = self.bundle.metadata_files['links'] - return LinkedEntities.from_links(entity_ref, links) + def _linked_entities(self, entity: EntityReference) -> LinkedEntities: + return LinkedEntities.from_links(entity, self.bundle.links) @classmethod def _entity_types(cls) -> FieldTypes: @@ -294,8 +286,8 @@ def _aggregate_file_types(cls) -> FieldTypes: 'count': pass_thru_int # Added by FileAggregator, ever null } - def _range(self, manifest_entry: JSON, *field_prefixes: str) -> MutableJSON: - metadata = self.bundle.metadata_files[manifest_entry['name']] + def _range(self, entity: EntityReference, *field_prefixes: str) -> MutableJSON: + metadata = self.bundle.entities[entity] def get_bound(field_name: str) -> Optional[float]: val = metadata[field_name] @@ -310,11 +302,10 @@ def get_bound(field_name: str) -> Optional[float]: } def _contribution(self, + entity: EntityReference, contents: MutableJSON, - entity_id: EntityID ) -> Contribution: - entity = EntityReference(entity_type=self.entity_type(), - entity_id=entity_id) + assert entity.entity_type == self.entity_type() coordinates = ContributionCoordinates(entity=entity, bundle=self.bundle.fqid.upcast(), deleted=self.deleted) @@ -324,14 +315,13 @@ def _contribution(self, contents=contents) def _entity(self, - manifest_entry: JSON, + entity: EntityReference, field_types: FieldTypes, **additional_fields ) -> MutableJSON: - metadata = self.bundle.metadata_files[manifest_entry['name']] + metadata = self.bundle.entities[entity] field_values = ChainMap(metadata, - {'document_id': manifest_entry['uuid']}, - manifest_entry, + {'document_id': entity.entity_id}, additional_fields) return { field: field_values[field] @@ -339,22 +329,17 @@ def _entity(self, } def _entities(self, - factory: Callable[[JSON], MutableJSON], - entity_ids: Iterable[EntityID], + factory: Callable[[EntityReference], MutableJSON], + entities: Iterable[EntityReference], ) -> MutableJSONs: - entities = [] - for entity_id in sorted(entity_ids): - manifest_entry = self._entries_by_entity_id[entity_id] - entities.append(factory(manifest_entry)) - return entities + return [factory(entity) for entity in entities] - def _activity(self, manifest_entry: JSON) -> MutableJSON: - activity_table = self._entity_type(manifest_entry) - metadata = self.bundle.metadata_files[manifest_entry['name']] + def _activity(self, activity: EntityReference) -> MutableJSON: + metadata = self.bundle.entities[activity] field_types = self._activity_types() common_fields = { - 'activity_table': activity_table, - 'activity_id': metadata[f'{activity_table}_id'] + 'activity_table': activity.entity_type, + 'activity_id': metadata[f'{activity.entity_type}_id'] } # Activities are unique in that they may not contain every field defined # in their field types due to polymorphism, so we need to pad the field @@ -364,37 +349,37 @@ def _activity(self, manifest_entry: JSON) -> MutableJSON: for field_name, field_type in field_types.items() if field_name not in common_fields } - activity = self._entity(manifest_entry, + activity = self._entity(activity, self._activity_types(), **common_fields, **union_fields) return activity - def _biosample(self, manifest_entry: JSON) -> MutableJSON: - return self._entity(manifest_entry, + def _biosample(self, biosample: EntityReference) -> MutableJSON: + return self._entity(biosample, self._biosample_types(), - **self._range(manifest_entry, 'donor_age_at_collection')) + **self._range(biosample, 'donor_age_at_collection')) - def _dataset(self, manifest_entry: JSON) -> MutableJSON: - return self._entity(manifest_entry, self._dataset_types()) + def _dataset(self, dataset: EntityReference) -> MutableJSON: + return self._entity(dataset, self._dataset_types()) - def _diagnosis(self, manifest_entry: JSON) -> MutableJSON: - return self._entity(manifest_entry, + def _diagnosis(self, diagnosis: EntityReference) -> MutableJSON: + return self._entity(diagnosis, self._diagnosis_types(), - **self._range(manifest_entry, 'diagnosis_age', 'onset_age')) + **self._range(diagnosis, 'diagnosis_age', 'onset_age')) - def _donor(self, manifest_entry: JSON) -> MutableJSON: - return self._entity(manifest_entry, self._donor_types()) + def _donor(self, donor: EntityReference) -> MutableJSON: + return self._entity(donor, self._donor_types()) - def _file(self, manifest_entry: JSON) -> MutableJSON: - metadata = self.bundle.metadata_files[manifest_entry['name']] - return self._entity(manifest_entry, + def _file(self, file: EntityReference) -> MutableJSON: + metadata = self.bundle.entities[file] + return self._entity(file, self._file_types(), size=metadata['file_size']) def _only_dataset(self) -> MutableJSON: - return self._dataset(self._entries_by_entity_id[one(self._entities_by_type['dataset'])]) + return self._dataset(one(self._entities_by_type['dataset'])) _activity_polymorphic_types = { 'activity', @@ -411,17 +396,17 @@ class ActivityTransformer(BaseTransformer): def entity_type(cls) -> str: return 'activities' - def _transform(self, manifest_entry: JSON) -> Contribution: - linked = self._linked_entities(manifest_entry) + def _transform(self, entity: EntityReference) -> Contribution: + linked = self._linked_entities(entity) contents = dict( - activities=[self._activity(manifest_entry)], + activities=[self._activity(entity)], biosamples=self._entities(self._biosample, linked['biosample']), datasets=[self._only_dataset()], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=self._entities(self._donor, linked['donor']), files=self._entities(self._file, linked['file']), ) - return self._contribution(contents, manifest_entry['uuid']) + return self._contribution(entity, contents) class BiosampleTransformer(BaseTransformer): @@ -430,20 +415,20 @@ class BiosampleTransformer(BaseTransformer): def entity_type(cls) -> str: return 'biosamples' - def _transform(self, manifest_entry: JSON) -> Contribution: - linked = self._linked_entities(manifest_entry) + def _transform(self, entity: EntityReference) -> Contribution: + linked = self._linked_entities(entity) contents = dict( activities=self._entities(self._activity, chain.from_iterable( linked[activity_type] for activity_type in self._activity_polymorphic_types )), - biosamples=[self._biosample(manifest_entry)], + biosamples=[self._biosample(entity)], datasets=[self._only_dataset()], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=self._entities(self._donor, linked['donor']), files=self._entities(self._file, linked['file']), ) - return self._contribution(contents, manifest_entry['uuid']) + return self._contribution(entity, contents) class DatasetTransformer(BaseTransformer): @@ -452,19 +437,19 @@ class DatasetTransformer(BaseTransformer): def entity_type(cls) -> str: return 'datasets' - def _transform(self, manifest_entry: JSON) -> Contribution: + def _transform(self, entity: EntityReference) -> Contribution: contents = dict( activities=self._entities(self._activity, chain.from_iterable( self._entities_by_type[activity_type] for activity_type in self._activity_polymorphic_types )), biosamples=self._entities(self._biosample, self._entities_by_type['biosample']), - datasets=[self._dataset(manifest_entry)], + datasets=[self._dataset(entity)], diagnoses=self._entities(self._diagnosis, self._entities_by_type['diagnosis']), donors=self._entities(self._donor, self._entities_by_type['donor']), files=self._entities(self._file, self._entities_by_type['file']), ) - return self._contribution(contents, manifest_entry['uuid']) + return self._contribution(entity, contents) class DonorTransformer(BaseTransformer): @@ -473,8 +458,8 @@ class DonorTransformer(BaseTransformer): def entity_type(cls) -> str: return 'donors' - def _transform(self, manifest_entry: JSON) -> Contribution: - linked = self._linked_entities(manifest_entry) + def _transform(self, entity: EntityReference) -> Contribution: + linked = self._linked_entities(entity) contents = dict( activities=self._entities(self._activity, chain.from_iterable( linked[activity_type] @@ -483,10 +468,10 @@ def _transform(self, manifest_entry: JSON) -> Contribution: biosamples=self._entities(self._biosample, linked['biosample']), datasets=[self._only_dataset()], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), - donors=[self._donor(manifest_entry)], + donors=[self._donor(entity)], files=self._entities(self._file, linked['file']), ) - return self._contribution(contents, manifest_entry['uuid']) + return self._contribution(entity, contents) class FileTransformer(BaseTransformer): @@ -495,8 +480,8 @@ class FileTransformer(BaseTransformer): def entity_type(cls) -> str: return 'files' - def _transform(self, manifest_entry: JSON) -> Contribution: - linked = self._linked_entities(manifest_entry) + def _transform(self, entity: EntityReference) -> Contribution: + linked = self._linked_entities(entity) contents = dict( activities=self._entities(self._activity, chain.from_iterable( linked[activity_type] @@ -506,6 +491,6 @@ def _transform(self, manifest_entry: JSON) -> Contribution: datasets=[self._only_dataset()], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=self._entities(self._donor, linked['donor']), - files=[self._file(manifest_entry)], + files=[self._file(entity)], ) - return self._contribution(contents, manifest_entry['uuid']) + return self._contribution(entity, contents) diff --git a/src/azul/plugins/metadata/hca/__init__.py b/src/azul/plugins/metadata/hca/__init__.py index 03961fdcd..797cdd278 100644 --- a/src/azul/plugins/metadata/hca/__init__.py +++ b/src/azul/plugins/metadata/hca/__init__.py @@ -8,9 +8,6 @@ Type, ) -from azul.indexer import ( - Bundle, -) from azul.indexer.document import ( Aggregate, ) @@ -20,6 +17,9 @@ MetadataPlugin, Sorting, ) +from azul.plugins.hca import ( + HCABundle, +) from azul.plugins.metadata.hca.indexer.aggregate import ( HCAAggregate, ) @@ -53,7 +53,7 @@ ) -class Plugin(MetadataPlugin[Bundle]): +class Plugin(MetadataPlugin[HCABundle]): @classmethod def atlas(cls) -> str: @@ -68,7 +68,7 @@ def transformer_types(self) -> Iterable[Type[BaseTransformer]]: BundleTransformer ) - def transformers(self, bundle: Bundle, *, delete: bool) -> Iterable[BaseTransformer]: + def transformers(self, bundle: HCABundle, *, delete: bool) -> Iterable[BaseTransformer]: api_bundle = api.Bundle(uuid=bundle.uuid, version=bundle.version, manifest=bundle.manifest, diff --git a/src/azul/plugins/metadata/hca/indexer/transform.py b/src/azul/plugins/metadata/hca/indexer/transform.py index 9eaf5b679..029595d73 100644 --- a/src/azul/plugins/metadata/hca/indexer/transform.py +++ b/src/azul/plugins/metadata/hca/indexer/transform.py @@ -54,7 +54,6 @@ auto, ) from azul.indexer import ( - Bundle, BundlePartition, ) from azul.indexer.aggregate import ( @@ -87,6 +86,9 @@ from azul.openapi import ( schema, ) +from azul.plugins.hca import ( + HCABundle, +) from azul.plugins.metadata.hca.indexer.aggregate import ( CellLineAggregator, CellSuspensionAggregator, @@ -449,7 +451,7 @@ class DatedEntity(Entity, Protocol): @attr.s(frozen=True, kw_only=True, auto_attribs=True) class BaseTransformer(Transformer, metaclass=ABCMeta): - bundle: Bundle + bundle: HCABundle api_bundle: api.Bundle deleted: bool @@ -462,7 +464,7 @@ class BaseTransformer(Transformer, metaclass=ABCMeta): # noinspection PyDataclass,PyUnusedLocal def __init__(self, *, - bundle: Bundle, + bundle: HCABundle, api_bundle: api.Bundle, deleted: bool): ... diff --git a/src/azul/plugins/repository/canned/__init__.py b/src/azul/plugins/repository/canned/__init__.py index e7dcb35d9..1d91d4a63 100644 --- a/src/azul/plugins/repository/canned/__init__.py +++ b/src/azul/plugins/repository/canned/__init__.py @@ -39,7 +39,6 @@ Authentication, ) from azul.indexer import ( - Bundle, SimpleSourceSpec, SourceRef, SourcedBundleFQID, @@ -48,6 +47,9 @@ RepositoryFileDownload, RepositoryPlugin, ) +from azul.plugins.hca import ( + HCABundle, +) from azul.time import ( parse_dcp2_version, ) @@ -75,7 +77,7 @@ class CannedBundleFQID(SourcedBundleFQID[CannedSourceRef]): pass -class CannedBundle(Bundle[CannedBundleFQID]): +class CannedBundle(HCABundle[CannedBundleFQID]): def drs_path(self, manifest_entry: JSON) -> Optional[str]: return None diff --git a/src/azul/plugins/repository/dss/__init__.py b/src/azul/plugins/repository/dss/__init__.py index d6a8f0919..c55b63371 100644 --- a/src/azul/plugins/repository/dss/__init__.py +++ b/src/azul/plugins/repository/dss/__init__.py @@ -39,7 +39,6 @@ aws, ) from azul.indexer import ( - Bundle, SimpleSourceSpec, SourceRef, SourcedBundleFQID, @@ -48,6 +47,9 @@ RepositoryFileDownload, RepositoryPlugin, ) +from azul.plugins.hca import ( + HCABundle, +) from azul.time import ( parse_dcp2_version, ) @@ -80,7 +82,7 @@ class DSSBundleFQID(SourcedBundleFQID[DSSSourceRef]): pass -class DSSBundle(Bundle[DSSBundleFQID]): +class DSSBundle(HCABundle[DSSBundleFQID]): def drs_path(self, manifest_entry: JSON) -> str: file_uuid = manifest_entry['uuid'] diff --git a/src/azul/plugins/repository/tdr.py b/src/azul/plugins/repository/tdr.py index 6644e60aa..6897bc40a 100644 --- a/src/azul/plugins/repository/tdr.py +++ b/src/azul/plugins/repository/tdr.py @@ -1,4 +1,5 @@ from abc import ( + ABC, abstractmethod, ) from collections.abc import ( @@ -71,7 +72,7 @@ class TDRBundleFQID(SourcedBundleFQID[TDRSourceRef]): pass -class TDRBundle(Bundle[TDRBundleFQID]): +class TDRBundle(Bundle[TDRBundleFQID], ABC): def drs_path(self, manifest_entry: JSON) -> Optional[str]: return manifest_entry.get('drs_path') diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index 566a56bc0..78bc3d608 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -12,10 +12,8 @@ from typing import ( AbstractSet, Callable, - Iterable, Mapping, Optional, - Union, ) import attr @@ -39,6 +37,12 @@ EntityReference, EntityType, ) +from azul.plugins.anvil import ( + AnvilBundle, + Key, + KeyReference, + Link, +) from azul.plugins.repository.tdr import ( TDRBundle, TDRBundleFQID, @@ -49,8 +53,6 @@ TDRSourceSpec, ) from azul.types import ( - AnyMutableJSON, - JSON, MutableJSON, MutableJSONs, ) @@ -60,59 +62,11 @@ log = logging.getLogger(__name__) -# AnVIL snapshots do not use UUIDs for primary/foreign keys. -# This type alias helps us distinguish these keys from the document UUIDs, -# which are drawn from the `datarepo_row_id` column. -# Note that entities from different tables may have the same key, so -# `KeyReference` should be used when mixing keys from different entity types. -Key = str - - -@attr.s(frozen=True, auto_attribs=True, kw_only=True, slots=True) -class KeyReference: - key: Key - entity_type: EntityType - - Keys = AbstractSet[KeyReference] MutableKeys = set[KeyReference] KeysByType = dict[EntityType, AbstractSet[Key]] MutableKeysByType = dict[EntityType, set[Key]] - - -@attr.s(frozen=True, auto_attribs=True, kw_only=True, slots=True) -class Link: - inputs: Keys - activity: Optional[KeyReference] - outputs: Keys - - @property - def all_entities(self) -> Keys: - return self.inputs | self.outputs | (set() if self.activity is None else {self.activity}) - - @classmethod - def create(cls, - *, - inputs: Union[KeyReference, Iterable[KeyReference]], - outputs: Union[KeyReference, Iterable[KeyReference]], - activity: Optional[KeyReference] = None - ) -> 'Link': - if isinstance(inputs, KeyReference): - inputs = (inputs,) - if isinstance(outputs, KeyReference): - outputs = (outputs,) - return cls(inputs=frozenset(inputs), - outputs=frozenset(outputs), - activity=activity) - - @classmethod - def merge(cls, links: Iterable['Link']) -> 'Link': - return cls(inputs=frozenset.union(*[link.inputs for link in links]), - activity=one({link.activity for link in links}), - outputs=frozenset.union(*[link.outputs for link in links])) - - -Links = set[Link] +KeyLinks = set[Link[KeyReference]] class BundleEntityType(Enum): @@ -129,66 +83,26 @@ def to_json(self) -> SourcedBundleFQIDJSON: entity_type=self.entity_type.value) -class TDRAnvilBundle(TDRBundle): +class TDRAnvilBundle(AnvilBundle[AnvilBundleFQID], TDRBundle): def add_entity(self, entity: EntityReference, version: str, row: MutableJSON ) -> None: - pk_column = entity.entity_type + '_id' - self._add_entity( - manifest_entry={ - 'uuid': entity.entity_id, - 'version': version, - 'name': f'{entity.entity_type}_{row[pk_column]}', - 'indexed': True, - 'crc32': '', - 'sha256': '', - **( - {'drs_path': self._parse_drs_uri(row.get('file_ref'))} - if entity.entity_type == 'file' else {} - ) - }, - metadata=row - ) + assert entity not in self.entities + self.entities[entity] = row def add_links(self, - links: Links, + links: KeyLinks, entities_by_key: Mapping[KeyReference, EntityReference]) -> None: - def link_sort_key(link: JSON): - return link['activity'] or '', link['inputs'], link['outputs'] - - def key_ref_to_entity_ref(key_ref: KeyReference) -> str: - return str(entities_by_key[key_ref]) - - self._add_entity( - manifest_entry={ - 'uuid': self.fqid.uuid, - 'version': self.fqid.version, - 'name': 'links', - 'indexed': True - }, - metadata=sorted(( - { - 'inputs': sorted(map(key_ref_to_entity_ref, link.inputs)), - 'activity': None if link.activity is None else key_ref_to_entity_ref(link.activity), - 'outputs': sorted(map(key_ref_to_entity_ref, link.outputs)) - } - for link in links - ), key=link_sort_key) + self.links.update( + Link(inputs={entities_by_key[input] for input in link.inputs}, + activity=None if link.activity is None else entities_by_key[link.activity], + outputs={entities_by_key[output] for output in link.outputs}) + for link in links ) - def _add_entity(self, - *, - manifest_entry: MutableJSON, - metadata: AnyMutableJSON - ) -> None: - name = manifest_entry['name'] - assert name not in self.metadata_files, name - self.manifest.append(manifest_entry) - self.metadata_files[name] = metadata - def _parse_drs_uri(self, file_ref: Optional[str]) -> Optional[str]: if file_ref is None: return None @@ -298,10 +212,10 @@ def _primary_bundle(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle: bundle_entity = self._bundle_entity(bundle_fqid) keys: MutableKeys = {bundle_entity} - links: Links = set() + links: KeyLinks = set() for method in [self._follow_downstream, self._follow_upstream]: - method: Callable[[TDRSourceSpec, KeysByType], Links] + method: Callable[[TDRSourceSpec, KeysByType], KeyLinks] n = len(keys) frontier: Keys = keys while frontier: @@ -320,7 +234,7 @@ def _primary_bundle(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle: len(keys), bundle_fqid.uuid, arg) self._simplify_links(links) - result = TDRAnvilBundle(fqid=bundle_fqid, manifest=[], metadata_files={}) + result = TDRAnvilBundle(fqid=bundle_fqid) entities_by_key: dict[KeyReference, EntityReference] = {} for entity_type, typed_keys in sorted(keys_by_type.items()): pk_column = entity_type + '_id' @@ -339,7 +253,7 @@ def _supplementary_bundle(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle: self.datarepo_row_uuid_version) source = bundle_fqid.source.spec bundle_entity_type = bundle_fqid.entity_type.value - result = TDRAnvilBundle(fqid=bundle_fqid, manifest=[], metadata_files={}) + result = TDRAnvilBundle(fqid=bundle_fqid) columns = self._columns(bundle_entity_type) bundle_entity = dict(one(self._run_sql(f''' SELECT {', '.join(sorted(columns))} @@ -362,8 +276,8 @@ def _supplementary_bundle(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle: key_ref = KeyReference(key=row[entity_type + '_id'], entity_type=entity_type) entities_by_key[key_ref] = entity_ref result.add_entity(entity_ref, self._version, row) - link_args[arg] = key_ref - result.add_links({Link.create(**link_args)}, entities_by_key) + link_args[arg] = {key_ref} + result.add_links({Link(**link_args)}, entities_by_key) return result def _bundle_entity(self, bundle_fqid: AnvilBundleFQID) -> KeyReference: @@ -395,7 +309,7 @@ def _consolidate_by_type(self, entities: Keys) -> MutableKeysByType: result[e.entity_type].add(e.key) return result - def _simplify_links(self, links: Links) -> None: + def _simplify_links(self, links: KeyLinks) -> None: grouped_links = defaultdict(set) for link in links: grouped_links[link.activity].add(link) @@ -407,7 +321,7 @@ def _simplify_links(self, links: Links) -> None: def _follow_upstream(self, source: TDRSourceSpec, entities: KeysByType - ) -> Links: + ) -> KeyLinks: return set.union( self._upstream_from_files(source, entities['file']), self._upstream_from_biosamples(source, entities['biosample']), @@ -443,7 +357,7 @@ def _follow_upstream(self, def _follow_downstream(self, source: TDRSourceSpec, entities: KeysByType - ) -> Links: + ) -> KeyLinks: return set.union( self._downstream_from_biosamples(source, entities['biosample']), self._downstream_from_files(source, entities['file']) @@ -452,24 +366,24 @@ def _follow_downstream(self, def _upstream_from_biosamples(self, source: TDRSourceSpec, biosample_ids: AbstractSet[Key] - ) -> Links: + ) -> KeyLinks: if biosample_ids: rows = self._run_sql(f''' SELECT b.biosample_id, b.donor_id, b.part_of_dataset_id FROM {backtick(self._full_table_name(source, 'biosample'))} AS b WHERE b.biosample_id IN ({', '.join(map(repr, biosample_ids))}) ''') - result: Links = set() + result: KeyLinks = set() for row in rows: downstream_ref = KeyReference(entity_type='biosample', key=row['biosample_id']) - result.add(Link.create(outputs=downstream_ref, - inputs=KeyReference(entity_type='dataset', - key=one(row['part_of_dataset_id'])))) + result.add(Link(outputs={downstream_ref}, + inputs={KeyReference(entity_type='dataset', + key=one(row['part_of_dataset_id']))})) for donor_id in row['donor_id']: - result.add(Link.create(outputs=downstream_ref, - inputs=KeyReference(entity_type='donor', - key=donor_id))) + result.add(Link(outputs={downstream_ref}, + inputs={KeyReference(entity_type='donor', + key=donor_id)})) return result else: return set() @@ -477,7 +391,7 @@ def _upstream_from_biosamples(self, def _upstream_from_files(self, source: TDRSourceSpec, file_ids: AbstractSet[Key] - ) -> Links: + ) -> KeyLinks: if file_ids: rows = self._run_sql(f''' WITH file AS ( @@ -531,18 +445,18 @@ def _upstream_from_files(self, ON f.file_id IN UNNEST(a.generated_file_id) ''') return { - Link.create( + Link( activity=KeyReference(entity_type=row['activity_table'], key=row['activity_id']), # The generated link is not a complete representation of the # upstream activity because it does not include generated files # that are not ancestors of the downstream file - outputs=KeyReference(entity_type='file', key=row['generated_file_id']), - inputs=[ + outputs={KeyReference(entity_type='file', key=row['generated_file_id'])}, + inputs={ KeyReference(entity_type=entity_type, key=key) for entity_type, column in [('file', 'uses_file_id'), ('biosample', 'uses_biosample_id')] for key in row[column] - ] + } ) for row in rows } @@ -552,7 +466,7 @@ def _upstream_from_files(self, def _diagnoses_from_donors(self, source: TDRSourceSpec, donor_ids: AbstractSet[Key] - ) -> Links: + ) -> KeyLinks: if donor_ids: rows = self._run_sql(f''' SELECT dgn.donor_id, dgn.diagnosis_id @@ -560,9 +474,9 @@ def _diagnoses_from_donors(self, WHERE dgn.donor_id IN ({', '.join(map(repr, donor_ids))}) ''') return { - Link.create(inputs={KeyReference(key=row['diagnosis_id'], entity_type='diagnosis')}, - outputs={KeyReference(key=row['donor_id'], entity_type='donor')}, - activity=None) + Link(inputs={KeyReference(key=row['diagnosis_id'], entity_type='diagnosis')}, + outputs={KeyReference(key=row['donor_id'], entity_type='donor')}, + activity=None) for row in rows } else: @@ -571,7 +485,7 @@ def _diagnoses_from_donors(self, def _downstream_from_biosamples(self, source: TDRSourceSpec, biosample_ids: AbstractSet[Key], - ) -> Links: + ) -> KeyLinks: if biosample_ids: rows = self._run_sql(f''' WITH activities AS ( @@ -605,12 +519,12 @@ def _downstream_from_biosamples(self, WHERE biosample_id IN ({', '.join(map(repr, biosample_ids))}) ''') return { - Link.create(inputs={KeyReference(key=row['biosample_id'], entity_type='biosample')}, - outputs=[ - KeyReference(key=output_id, entity_type='file') - for output_id in row['generated_file_id'] - ], - activity=KeyReference(key=row['activity_id'], entity_type=row['activity_table'])) + Link(inputs={KeyReference(key=row['biosample_id'], entity_type='biosample')}, + outputs={ + KeyReference(key=output_id, entity_type='file') + for output_id in row['generated_file_id'] + }, + activity=KeyReference(key=row['activity_id'], entity_type=row['activity_table'])) for row in rows } else: @@ -619,7 +533,7 @@ def _downstream_from_biosamples(self, def _downstream_from_files(self, source: TDRSourceSpec, file_ids: AbstractSet[Key] - ) -> Links: + ) -> KeyLinks: if file_ids: rows = self._run_sql(f''' WITH activities AS ( @@ -651,12 +565,12 @@ def _downstream_from_files(self, WHERE used_file_id IN ({', '.join(map(repr, file_ids))}) ''') return { - Link.create(inputs=KeyReference(key=row['used_file_id'], entity_type='file'), - outputs=[ - KeyReference(key=file_id, entity_type='file') - for file_id in row['generated_file_id'] - ], - activity=KeyReference(key=row['activity_id'], entity_type=row['activity_table'])) + Link(inputs={KeyReference(key=row['used_file_id'], entity_type='file')}, + outputs={ + KeyReference(key=file_id, entity_type='file') + for file_id in row['generated_file_id'] + }, + activity=KeyReference(key=row['activity_id'], entity_type=row['activity_table'])) for row in rows } else: diff --git a/src/azul/plugins/repository/tdr_hca/__init__.py b/src/azul/plugins/repository/tdr_hca/__init__.py index 67e78245b..98c9dac5f 100644 --- a/src/azul/plugins/repository/tdr_hca/__init__.py +++ b/src/azul/plugins/repository/tdr_hca/__init__.py @@ -50,6 +50,9 @@ EntityReference, EntityType, ) +from azul.plugins.hca import ( + HCABundle, +) from azul.plugins.repository.tdr import ( TDRBundle, TDRBundleFQID, @@ -172,7 +175,7 @@ def extract_field(field: attr.Attribute) -> tuple[str, Any]: return cls(**dict(map(extract_field, attr.fields(cls)))) -class TDRHCABundle(TDRBundle): +class TDRHCABundle(HCABundle[TDRBundleFQID], TDRBundle): def add_entity(self, *,