Skip to content

Commit

Permalink
Accomodate arbitrary unions in verbatim PFB schema
Browse files Browse the repository at this point in the history
...to faciliate HCA support
  • Loading branch information
nadove-ucsc committed Apr 12, 2024
1 parent 8495197 commit 3ca0110
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 57 deletions.
5 changes: 4 additions & 1 deletion src/azul/plugins/metadata/hca/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ def manifest_formats(self) -> Sequence[ManifestFormat]:
ManifestFormat.terra_bdbag,
ManifestFormat.terra_pfb,
ManifestFormat.curl,
*iif(config.enable_replicas, [ManifestFormat.verbatim_jsonl])
*iif(config.enable_replicas, [
ManifestFormat.verbatim_jsonl,
ManifestFormat.verbatim_pfb
])
]

@property
Expand Down
112 changes: 70 additions & 42 deletions src/azul/service/avro_pfb.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,17 @@ def _entity_schema_recursive(field_types: FieldTypes,
assert False, field_type


def _sort_pfb_union(schema: str | dict) -> str:
if isinstance(schema, str):
return schema
else:
return schema['type']


class SchemaUpdateException(Exception):
pass


def _update_replica_schema(*,
schema: MutableJSON,
path: Sequence[str],
Expand All @@ -648,54 +659,71 @@ def _update_replica_schema(*,
except KeyError:
schema[key] = _new_replica_schema(path=path, value=value)
else:
if old_type == []:
schema[key] = _new_replica_schema(path=path, value=value)
elif value is None:
if old_type == 'null' or isinstance(old_type, list):
if isinstance(old_type, list):
_update_replica_schema_union(schema=schema, path=path, key=key, value=value)
else:
if value is None and old_type == 'null':
pass
elif isinstance(value, list) and isinstance(old_type, dict) and old_type['type'] == 'array':
for v in value:
_update_replica_schema_union(schema=old_type,
path=path,
key='items',
value=v)
elif isinstance(value, dict) and isinstance(old_type, dict) and old_type['type'] == 'record':
old_fields = {field['name']: field for field in old_type['fields']}
for k in value.keys() | old_fields.keys():
try:
field = old_fields[k]
except KeyError:
field = {
'name': k,
'namespace': '.'.join(path),
'type': 'null'
}
bisect.insort(old_type['fields'], field, key=itemgetter('name'))
new_value = value[k]
else:
new_value = value.get(k)
_update_replica_schema_union(schema=field,
path=(*path, k),
key='type',
value=new_value)
else:
schema[key] = ['null', old_type]
elif old_type == 'null':
schema[key] = [
'null',
_new_replica_schema(path=path, value=value)
]
elif isinstance(value, list):
if isinstance(old_type, list):
old_type = old_type[1]
assert old_type['type'] == 'array', old_type
for v in value:
_update_replica_schema(schema=old_type,
path=path,
key='items',
value=v)
elif isinstance(value, dict):
if isinstance(old_type, list):
old_type = old_type[1]
assert old_type['type'] == 'record', old_type
old_fields = {field['name']: field for field in old_type['fields']}
for k in value.keys() | old_fields.keys():
try:
field = old_fields[k]
new_type = _json_to_pfb_types[type(value)]
except KeyError:
field = {
'name': k,
'namespace': '.'.join(path),
'type': 'null'
}
bisect.insort(old_type['fields'], field, key=itemgetter('name'))
new_value = value[k]
raise SchemaUpdateException
else:
new_value = value.get(k)
_update_replica_schema(schema=field,
path=(*path, k),
key='type',
value=new_value)
if new_type != old_type:
raise SchemaUpdateException


def _update_replica_schema_union(*,
schema: MutableJSON,
path: Sequence[str],
key: str,
value: AnyMutableJSON):
old_type = schema[key]
if not isinstance(old_type, list):
old_type = [old_type]
for union_member in old_type:
try:
_update_replica_schema(schema={key: union_member},
path=path,
key=key,
value=value)
except SchemaUpdateException:
continue
else:
break
else:
new_type = _new_replica_schema(path=path, value=value)
if old_type:
bisect.insort(old_type, new_type, key=_sort_pfb_union)
else:
new_type = _json_to_pfb_types[type(value)]
if isinstance(old_type, list):
old_type = old_type[1]
assert old_type == new_type, (old_type, value)
old_type = new_type
schema[key] = old_type


def _new_replica_schema(*,
Expand Down
28 changes: 14 additions & 14 deletions test/service/data/verbatim/pfb_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -188,33 +188,33 @@
"name": "consent_group",
"namespace": "anvil_dataset",
"type": [
"null",
{
"items": "string",
"type": "array"
}
},
"null"
]
},
{
"name": "data_modality",
"namespace": "anvil_dataset",
"type": [
"null",
{
"items": [],
"type": "array"
}
},
"null"
]
},
{
"name": "data_use_permission",
"namespace": "anvil_dataset",
"type": [
"null",
{
"items": "string",
"type": "array"
}
},
"null"
]
},
{
Expand Down Expand Up @@ -245,44 +245,44 @@
"name": "owner",
"namespace": "anvil_dataset",
"type": [
"null",
{
"items": "string",
"type": "array"
}
},
"null"
]
},
{
"name": "principal_investigator",
"namespace": "anvil_dataset",
"type": [
"null",
{
"items": [],
"type": "array"
}
},
"null"
]
},
{
"name": "registered_identifier",
"namespace": "anvil_dataset",
"type": [
"null",
{
"items": "string",
"type": "array"
}
},
"null"
]
},
{
"name": "source_datarepo_row_ids",
"namespace": "anvil_dataset",
"type": [
"null",
{
"items": "string",
"type": "array"
}
},
"null"
]
},
{
Expand Down

0 comments on commit 3ca0110

Please sign in to comment.