Skip to content

Commit

Permalink
Refactor avro_pfb.py
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Apr 4, 2024
1 parent e4f9d3b commit f8b19e7
Showing 1 changed file with 24 additions and 14 deletions.
38 changes: 24 additions & 14 deletions src/azul/service/avro_pfb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from typing import (
ClassVar,
MutableSet,
Self,
Sequence,
)
from uuid import (
UUID,
Expand Down Expand Up @@ -123,9 +125,9 @@ def add_doc(self, doc: JSON):
for entity in sorted(entities, key=itemgetter('document_id')):
if entity_type != self.entity_type:
_inject_reference_handover_values(entity, doc)
pfb_entity = PFBEntity.from_json(name=entity_type,
object_=entity,
schema=self.schema)
pfb_entity = PFBEntity.for_transform(name=entity_type,
object_=entity,
schema=self.schema)
if pfb_entity not in self._entities:
self._entities[pfb_entity] = set()
file_relations.add(PFBRelation.to_entity(pfb_entity))
Expand All @@ -134,9 +136,9 @@ def add_doc(self, doc: JSON):
for entity in chain([file_entity], related_files):
_inject_reference_handover_values(entity, doc)
# File entities are assumed to be unique
pfb_entity = PFBEntity.from_json(name=self.entity_type,
object_=entity,
schema=self.schema)
pfb_entity = PFBEntity.for_transform(name=self.entity_type,
object_=entity,
schema=self.schema)
assert pfb_entity not in self._entities
# Terra streams PFBs and requires entities be defined before they are
# referenced. Thus we add the file entity after all the entities
Expand Down Expand Up @@ -172,6 +174,7 @@ def __attrs_post_init__(self):
@classmethod
def from_json(cls,
name: str,
ids: Sequence[str],
object_: MutableJSON,
schema: JSON
) -> 'PFBEntity':
Expand All @@ -180,14 +183,21 @@ def from_json(cls,
entities by comparing their IDs.
"""
cls._add_missing_fields(name, object_, schema)
ids = object_['document_id']
# document_id is an array unless the inner entity type matches the
# outer entity type
ids = sorted(ids) if isinstance(ids, list) else [ids]
id_ = uuid5(cls.namespace_uuid, _reversible_join('_', ids))
id_ = _reversible_join('_', ids)
id_ = uuid5(cls.namespace_uuid, id_)
id_ = _reversible_join('.', map(str, (name, id_, len(ids))))
return cls(id=id_, name=name, object=object_)

@classmethod
def for_transform(cls, name: str, object_: MutableJSON, schema: JSON) -> Self:
ids = object_['document_id']
return cls.from_json(name=name,
# document_id is an array unless the inner entity type matches the
# outer entity type
ids=sorted(ids) if isinstance(ids, list) else [ids],
object_=object_,
schema=schema)

@classmethod
def _add_missing_fields(cls, name: str, object_: MutableJSON, schema):
"""
Expand All @@ -207,7 +217,7 @@ def _add_missing_fields(cls, name: str, object_: MutableJSON, schema):
if isinstance(field_type, list):
assert 'null' in field_type, field
default_value = None
elif field_type['type'] == 'array':
elif isinstance(field_type, dict) and field_type['type'] == 'array':
if isinstance(field_type['items'], dict):
assert field_type['items']['type'] in ('record', 'array'), field
default_value = []
Expand Down Expand Up @@ -250,7 +260,7 @@ def to_entity(cls, entity: PFBEntity) -> 'PFBRelation':
return cls(dst_id=entity.id, dst_name=entity.name)


def pfb_metadata_entity(field_types: FieldTypes):
def pfb_metadata_entity(entity_types: Iterable[str]) -> MutableJSON:
"""
The Metadata entity encodes the possible relationships between tables.
Expand All @@ -271,7 +281,7 @@ def pfb_metadata_entity(field_types: FieldTypes):
'name': 'files'
}],
'properties': []
} for entity_type in field_types.keys()
} for entity_type in entity_types
],
'misc': {}
}
Expand Down

0 comments on commit f8b19e7

Please sign in to comment.