Skip to content

Commit

Permalink
Create PFB-based verbatim manifest format (#6040)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Apr 2, 2024
1 parent 7b09f0a commit 0ddacc3
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/azul/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class ManifestFormat(Enum):
terra_pfb = 'terra.pfb'
curl = 'curl'
verbatim_jsonl = 'verbatim.jsonl'
verbatim_pfb = 'verbatim.pfb'


T = TypeVar('T', bound='Plugin')
Expand Down
94 changes: 90 additions & 4 deletions src/azul/service/avro_pfb.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
itemgetter,
)
from typing import (
AbstractSet,
ClassVar,
MutableSet,
Self,
Expand Down Expand Up @@ -198,6 +199,13 @@ def for_transform(cls, name: str, object_: MutableJSON, schema: JSON) -> Self:
object_=object_,
schema=schema)

@classmethod
def for_replica(cls, object_: MutableJSON, schema: JSON) -> Self:
return cls.from_json(name=object_['replica_type'],
ids=[object_['entity_id']],
object_=object_,
schema=schema)

@classmethod
def _add_missing_fields(cls, name: str, object_: MutableJSON, schema):
"""
Expand Down Expand Up @@ -303,6 +311,14 @@ def pfb_schema_from_field_types(field_types: FieldTypes) -> JSON:
return _avro_pfb_schema(entity_schemas)


def pfb_schema_from_replicas(replicas: Iterable[JSON]) -> tuple[AbstractSet[str], JSON]:
schemas_by_replica_type = _schemas_by_replica_type(replicas)
return (
schemas_by_replica_type.keys(),
_avro_pfb_schema(schemas_by_replica_type.values())
)


def _avro_pfb_schema(azul_avro_schema: Iterable[JSON]) -> JSON:
"""
The boilerplate Avro schema that comprises a PFB's schema is returned in
Expand Down Expand Up @@ -483,6 +499,13 @@ def _inject_reference_handover_values(entity: MutableJSON, doc: JSON):
# that all of the primitive field types types are nullable
# https://github.com/DataBiosphere/azul/issues/4094

_json_to_pfb_types = {
bool: 'boolean',
float: 'double',
int: 'long',
str: 'string'
}

_nullable_to_pfb_types = {
null_bool: ['null', 'boolean'],
null_float: ['null', 'double'],
Expand Down Expand Up @@ -570,10 +593,7 @@ def _entity_schema_recursive(field_types: FieldTypes,
'type': 'array',
'items': {
'type': 'array',
'items': {
int: 'long',
float: 'double'
}[field_type.ends_type.native_type]
'items': _json_to_pfb_types[field_type.ends_type.native_type]
}
}
}
Expand Down Expand Up @@ -612,3 +632,69 @@ def _entity_schema_recursive(field_types: FieldTypes,
pass
else:
assert False, field_type


def _schemas_by_replica_type(replicas: Iterable[JSON]) -> JSON:
schemas = {}
for replica in replicas:
replica_type = replica['replica_type']
replica_contents = replica['contents']
_update_replica_schema_recursive(schemas,
replica_type,
replica_type,
replica_contents)
return schemas


def _update_replica_schema_recursive(schema, key, name, value):
try:
old_type = schema[key]
except KeyError:
schema[key] = _new_replica_schema(name, value)
else:
if value is None:
if old_type == 'null' or isinstance(old_type, list):
pass
else:
schema[key] = ['null', old_type]
elif old_type == 'null':
schema[key] = ['null', _new_replica_schema(name, value)]
elif isinstance(value, list):
for v in value:
_update_replica_schema_recursive(old_type, 'items', name, v)
elif isinstance(value, dict):
for k, v in value.items():
# This will fail if the set of keys is inconsistent
field = one(field for field in old_type['fields'] if field['name'] == k)
_update_replica_schema_recursive(field, 'type', k, v)
else:
new_type = _json_to_pfb_types[type(value)]
if isinstance(old_type, list):
old_type = old_type[1]
assert old_type == new_type, (old_type, value)


def _new_replica_schema(name, value):
if value is None:
return 'null'
elif isinstance(value, list):
schema = {'type': 'array'}
# The `items` field will be absent from the schema if we never observe
# a nonempty array
for v in value:
_update_replica_schema_recursive(schema, 'items', name, v)
return schema
elif isinstance(value, dict):
return {
'name': name,
'type': 'record',
'fields': [
{
'name': k,
'type': _new_replica_schema(k, v)
}
for k, v in value.items()
]
}
else:
return _json_to_pfb_types[type(value)]
34 changes: 32 additions & 2 deletions src/azul/service/manifest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2063,7 +2063,7 @@ def _all_replicas(self) -> Iterable[JSON]:
replica_id = replica.meta.id
if replica_id not in emitted_replica_ids:
num_new_replicas += 1
yield replica.contents.to_dict()
yield replica.to_dict()
# Note that this will be zero for replicas that use implicit
# hubs, in which case there are actually many hubs
explicit_hub_count = len(replica.hub_ids)
Expand Down Expand Up @@ -2109,6 +2109,36 @@ def create_file(self) -> tuple[str, Optional[str]]:
os.close(fd)
with open(path, 'w') as f:
for replica in self._all_replicas():
json.dump(replica, f)
json.dump(replica['contents'], f)
f.write('\n')
return path, None


class PFBVerbatimManifestGenerator(VerbatimManifestGenerator):

@property
def content_type(self) -> str:
return 'application/octet-stream'

@classmethod
def file_name_extension(cls):
return 'avro'

@classmethod
def format(cls) -> ManifestFormat:
return ManifestFormat.verbatim_pfb

def create_file(self) -> tuple[str, Optional[str]]:
replicas = list(self._all_replicas())
replica_types, pfb_schema = avro_pfb.pfb_schema_from_replicas(replicas)
pfb_metadata_entity = avro_pfb.pfb_metadata_entity(replica_types)

def pfb_entities():
yield pfb_metadata_entity
for replica in replicas:
yield avro_pfb.PFBEntity.for_replica(dict(replica), pfb_schema).to_json(())

fd, path = mkstemp(suffix=f'.{self.file_name_extension()}')
os.close(fd)
avro_pfb.write_pfb_entities(pfb_entities(), pfb_schema, path)
return path, None

0 comments on commit 0ddacc3

Please sign in to comment.