Skip to content

Commit

Permalink
Refactor transformers
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Sep 25, 2024
1 parent fdaabd8 commit 9205a80
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 126 deletions.
18 changes: 7 additions & 11 deletions src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from more_itertools import (
first,
one,
unzip,
)

from azul import (
Expand Down Expand Up @@ -305,16 +304,13 @@ def transform(self,
contributions = []
replicas = []
for transformer in transformers:
# The cast is necessary because unzip()'s type stub doesn't
# support heterogeneous tuples.
transforms = cast(
tuple[Iterable[Optional[Contribution]], Iterable[Optional[Replica]]],
unzip(transformer.transform(partition))
)
if transforms:
contributions_part, replicas_part = transforms
contributions.extend(filter(None, contributions_part))
replicas.extend(filter(None, replicas_part))
for document in transformer.transform(partition):
if isinstance(document, Contribution):
contributions.append(document)
elif isinstance(document, Replica):
replicas.append(document)
else:
assert False, document
return contributions, replicas

def create_indices(self, catalog: CatalogName):
Expand Down
21 changes: 10 additions & 11 deletions src/azul/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
JSON,
)

Transform = tuple[Optional[Contribution], Optional[Replica]]


@attr.s(frozen=True, kw_only=True, auto_attribs=True)
class Transformer(metaclass=ABCMeta):
Expand All @@ -54,12 +52,12 @@ def entity_type(cls) -> EntityType:
raise NotImplementedError

@abstractmethod
def replica_type(self, entity: EntityReference) -> str:
def _replicate(self, entity: EntityReference) -> tuple[str, JSON]:
"""
The name of the type of replica emitted by this transformer for a given
entity.
See :py:attr:`Replica.replica_type`
A tuple consisting of:
1. The name of the type of replica emitted by this transformer for a
given entity. See :py:attr:`Replica.replica_type`.
2. The contents of the replica for that entity.
"""
raise NotImplementedError

Expand Down Expand Up @@ -87,7 +85,7 @@ def estimate(self, partition: BundlePartition) -> int:
"""

@abstractmethod
def transform(self, partition: BundlePartition) -> Iterable[Transform]:
def transform(self, partition: BundlePartition) -> Iterable[Contribution | Replica]:
"""
Return the contributions by the current bundle to the entities it
contains metadata about. More than one bundle can contribute to a
Expand All @@ -114,8 +112,9 @@ def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]:

def _contribution(self,
contents: JSON,
entity: EntityReference
entity_id: EntityID
) -> Contribution:
entity = EntityReference(entity_type=self.entity_type(), entity_id=entity_id)
coordinates = ContributionCoordinates(entity=entity,
bundle=self.bundle.fqid.upcast(),
deleted=self.deleted)
Expand All @@ -125,15 +124,15 @@ def _contribution(self,
contents=contents)

def _replica(self,
contents: JSON,
entity: EntityReference,
hub_ids: list[EntityID]
) -> Replica:
replica_type, contents = self._replicate(entity)
coordinates = ReplicaCoordinates(content_hash=json_hash(contents).hexdigest(),
entity=entity)
return Replica(coordinates=coordinates,
version=None,
replica_type=self.replica_type(entity),
replica_type=replica_type,
contents=contents,
hub_ids=hub_ids)

Expand Down
118 changes: 50 additions & 68 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@
EntityReference,
EntityType,
FieldTypes,
Replica,
null_bool,
null_int,
null_str,
pass_thru_int,
pass_thru_json,
)
from azul.indexer.transform import (
Transform,
Transformer,
)
from azul.plugins.metadata.anvil.bundle import (
Expand Down Expand Up @@ -134,9 +134,6 @@ def _search(cls,
class BaseTransformer(Transformer, metaclass=ABCMeta):
bundle: AnvilBundle

def replica_type(self, entity: EntityReference) -> str:
return f'anvil_{entity.entity_type}'

@classmethod
def field_types(cls) -> FieldTypes:
return {
Expand Down Expand Up @@ -168,32 +165,20 @@ def aggregator(cls, entity_type) -> EntityAggregator:
def estimate(self, partition: BundlePartition) -> int:
return sum(map(partial(self._contains, partition), self.bundle.entities))

def transform(self, partition: BundlePartition) -> Iterable[Transform]:
return (
self._transform(entity)
for entity in self._list_entities()
if self._contains(partition, entity)
)
def transform(self, partition: BundlePartition) -> Iterable[Contribution | Replica]:
for entity in self._list_entities():
if self._contains(partition, entity):
yield from self._transform(entity)

def _list_entities(self) -> Iterable[EntityReference]:
return self.bundle.entities

@abstractmethod
def _transform(self, entity: EntityReference) -> Transform:
def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]:
raise NotImplementedError

def _add_replica(self,
contribution: JSON | None,
entity: EntityReference,
hub_ids: list[EntityID]
) -> Transform:
no_replica = not config.enable_replicas or self.entity_type() == 'bundles'
return (
None if contribution is None else self._contribution(contribution, entity),
None if no_replica else self._replica(self.bundle.entities[entity],
entity,
hub_ids)
)
def _replicate(self, entity: EntityReference) -> tuple[str, JSON]:
return f'anvil_{entity.entity_type}', self.bundle.entities[entity]

def _pluralize(self, entity_type: str) -> str:
if entity_type == 'diagnosis':
Expand Down Expand Up @@ -335,19 +320,6 @@ def get_bound(field_name: str) -> Optional[float]:
for field_prefix in field_prefixes
}

def _contribution(self,
contents: JSON,
entity: EntityReference,
) -> Contribution:
# The entity type is used to determine the index name.
# All activities go into the same index, regardless of their polymorphic type.
# Index names use plural forms.
entity_type = pluralize('activity'
if entity.entity_type.endswith('activity') else
entity.entity_type)
entity = attr.evolve(entity, entity_type=entity_type)
return super()._contribution(contents, entity)

def _entity(self,
entity: EntityReference,
field_types: FieldTypes,
Expand Down Expand Up @@ -524,7 +496,7 @@ class ActivityTransformer(BaseTransformer):
def entity_type(cls) -> str:
return 'activities'

def _transform(self, entity: EntityReference) -> Transform:
def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]:
linked = self._linked_entities(entity)
files = linked['file']
contents = dict(
Expand All @@ -535,8 +507,10 @@ def _transform(self, entity: EntityReference) -> Transform:
donors=self._entities(self._donor, linked['donor']),
files=self._entities(self._file, files),
)
hub_ids = [f.entity_id for f in files]
return self._add_replica(contents, entity, hub_ids)
yield self._contribution(contents, entity.entity_id)
if config.enable_replicas:
hub_ids = [f.entity_id for f in files]
yield self._replica(entity, hub_ids)


class BiosampleTransformer(BaseTransformer):
Expand All @@ -545,7 +519,7 @@ class BiosampleTransformer(BaseTransformer):
def entity_type(cls) -> str:
return 'biosamples'

def _transform(self, entity: EntityReference) -> Transform:
def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]:
linked = self._linked_entities(entity)
files = linked['file']
contents = dict(
Expand All @@ -559,16 +533,19 @@ def _transform(self, entity: EntityReference) -> Transform:
donors=self._entities(self._donor, linked['donor']),
files=self._entities(self._file, files),
)
hub_ids = [f.entity_id for f in files]
return self._add_replica(contents, entity, hub_ids)
yield self._contribution(contents, entity.entity_id)
if config.enable_replicas:
hub_ids = [f.entity_id for f in files]
yield self._replica(entity, hub_ids)


class DiagnosisTransformer(BaseTransformer):

def _transform(self, entity: EntityReference) -> Transform:
files = self._linked_entities(entity)['file']
hub_ids = [f.entity_id for f in files]
return self._add_replica(None, entity, hub_ids)
def _transform(self, entity: EntityReference) -> Iterable[Replica]:
if config.enable_replicas:
files = self._linked_entities(entity)['file']
hub_ids = [f.entity_id for f in files]
yield self._replica(entity, hub_ids)

@classmethod
def entity_type(cls) -> EntityType:
Expand All @@ -585,10 +562,9 @@ def _singleton(self) -> EntityReference:
return EntityReference(entity_type='bundle',
entity_id=self.bundle.uuid)

def _transform(self, entity: EntityReference) -> Transform:
def _transform(self, entity: EntityReference) -> Iterable[Contribution]:
contents = self._contents()
hub_ids = [f.entity_id for f in self._entities_by_type['file']]
return self._add_replica(contents, entity, hub_ids)
yield self._contribution(contents, entity.entity_id)


class DatasetTransformer(SingletonTransformer):
Expand All @@ -600,17 +576,19 @@ def entity_type(cls) -> str:
def _singleton(self) -> EntityReference:
return self._only_dataset()

def _transform(self, entity: EntityReference) -> Transform:
def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]:
contents = self._contents()
# Every file in a snapshot is linked to that snapshot's singular
# dataset, making an explicit list of hub IDs for the dataset both
# redundant and impractically large (we observe that for large
# snapshots, trying to track this many files in a single data structure
# causes a prohibitively high rate of conflicts during replica updates).
# Therefore, we leave the hub IDs field empty for datasets and rely on
# the tenet that every file is an implicit hub of its parent dataset.
hub_ids = []
return self._add_replica(contents, entity, hub_ids)
yield self._contribution(contents, entity.entity_id)
if config.enable_replicas:
# Every file in a snapshot is linked to that snapshot's singular
# dataset, making an explicit list of hub IDs for the dataset both
# redundant and impractically large (we observe that for large
# snapshots, trying to track this many files in a single data structure
# causes a prohibitively high rate of conflicts during replica updates).
# Therefore, we leave the hub IDs field empty for datasets and rely on
# the tenet that every file is an implicit hub of its parent dataset.
hub_ids = []
yield self._replica(entity, hub_ids)


class DonorTransformer(BaseTransformer):
Expand All @@ -619,7 +597,7 @@ class DonorTransformer(BaseTransformer):
def entity_type(cls) -> str:
return 'donors'

def _transform(self, entity: EntityReference) -> Transform:
def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]:
linked = self._linked_entities(entity)
files = linked['file']
contents = dict(
Expand All @@ -633,8 +611,10 @@ def _transform(self, entity: EntityReference) -> Transform:
donors=[self._donor(entity)],
files=self._entities(self._file, files),
)
hub_ids = [f.entity_id for f in files]
return self._add_replica(contents, entity, hub_ids)
yield self._contribution(contents, entity.entity_id)
if config.enable_replicas:
hub_ids = [f.entity_id for f in files]
yield self._replica(entity, hub_ids)


class FileTransformer(BaseTransformer):
Expand All @@ -643,7 +623,7 @@ class FileTransformer(BaseTransformer):
def entity_type(cls) -> str:
return 'files'

def _transform(self, entity: EntityReference) -> Transform:
def _transform(self, entity: EntityReference) -> Iterable[Contribution | Replica]:
linked = self._linked_entities(entity)
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
Expand All @@ -656,8 +636,10 @@ def _transform(self, entity: EntityReference) -> Transform:
donors=self._entities(self._donor, linked['donor']),
files=[self._file(entity)],
)
# The result of the link traversal does not include the starting entity,
# so without this step the file itself wouldn't be included in its hubs
files = (entity, *linked['file'])
hub_ids = [f.entity_id for f in files]
return self._add_replica(contents, entity, hub_ids)
yield self._contribution(contents, entity.entity_id)
if config.enable_replicas:
# The result of the link traversal does not include the starting entity,
# so without this step the file itself wouldn't be included in its hubs
files = (entity, *linked['file'])
hub_ids = [f.entity_id for f in files]
yield self._replica(entity, hub_ids)
Loading

0 comments on commit 9205a80

Please sign in to comment.