diff --git a/src/azul/plugins/__init__.py b/src/azul/plugins/__init__.py index 212f4d673..6bf9d6af1 100644 --- a/src/azul/plugins/__init__.py +++ b/src/azul/plugins/__init__.py @@ -134,6 +134,7 @@ class ManifestFormat(Enum): terra_pfb = 'terra.pfb' curl = 'curl' verbatim_jsonl = 'verbatim.jsonl' + verbatim_pfb = 'verbatim.pfb' T = TypeVar('T', bound='Plugin') diff --git a/src/azul/service/avro_pfb.py b/src/azul/service/avro_pfb.py index 0a94c9340..951f2d04e 100644 --- a/src/azul/service/avro_pfb.py +++ b/src/azul/service/avro_pfb.py @@ -13,6 +13,7 @@ itemgetter, ) from typing import ( + AbstractSet, ClassVar, MutableSet, Self, @@ -198,6 +199,13 @@ def for_transform(cls, name: str, object_: MutableJSON, schema: JSON) -> Self: object_=object_, schema=schema) + @classmethod + def for_replica(cls, object_: MutableJSON, schema: JSON) -> Self: + return cls.from_json(name=object_['replica_type'], + ids=[object_['entity_id']], + object_=object_, + schema=schema) + @classmethod def _add_missing_fields(cls, name: str, object_: MutableJSON, schema): """ @@ -303,6 +311,14 @@ def pfb_schema_from_field_types(field_types: FieldTypes) -> JSON: return _avro_pfb_schema(entity_schemas) +def pfb_schema_from_replicas(replicas: Iterable[JSON]) -> tuple[AbstractSet[str], JSON]: + schemas_by_replica_type = _schemas_by_replica_type(replicas) + return ( + schemas_by_replica_type.keys(), + _avro_pfb_schema(schemas_by_replica_type.values()) + ) + + def _avro_pfb_schema(azul_avro_schema: Iterable[JSON]) -> JSON: """ The boilerplate Avro schema that comprises a PFB's schema is returned in @@ -483,6 +499,13 @@ def _inject_reference_handover_values(entity: MutableJSON, doc: JSON): # that all of the primitive field types types are nullable # https://github.com/DataBiosphere/azul/issues/4094 +_json_to_pfb_types = { + bool: 'boolean', + float: 'double', + int: 'long', + str: 'string' +} + _nullable_to_pfb_types = { null_bool: ['null', 'boolean'], null_float: ['null', 'double'], @@ -570,10 +593,7 @@ def _entity_schema_recursive(field_types: FieldTypes, 'type': 'array', 'items': { 'type': 'array', - 'items': { - int: 'long', - float: 'double' - }[field_type.ends_type.native_type] + 'items': _json_to_pfb_types[field_type.ends_type.native_type] } } } @@ -612,3 +632,69 @@ def _entity_schema_recursive(field_types: FieldTypes, pass else: assert False, field_type + + +def _schemas_by_replica_type(replicas: Iterable[JSON]) -> JSON: + schemas = {} + for replica in replicas: + replica_type = replica['replica_type'] + replica_contents = replica['contents'] + _update_replica_schema_recursive(schemas, + replica_type, + replica_type, + replica_contents) + return schemas + + +def _update_replica_schema_recursive(schema, key, name, value): + try: + old_type = schema[key] + except KeyError: + schema[key] = _new_replica_schema(name, value) + else: + if value is None: + if old_type == 'null' or isinstance(old_type, list): + pass + else: + schema[key] = ['null', old_type] + elif old_type == 'null': + schema[key] = ['null', _new_replica_schema(name, value)] + elif isinstance(value, list): + for v in value: + _update_replica_schema_recursive(old_type, 'items', name, v) + elif isinstance(value, dict): + for k, v in value.items(): + # This will fail if the set of keys is inconsistent + field = one(field for field in old_type['fields'] if field['name'] == k) + _update_replica_schema_recursive(field, 'type', k, v) + else: + new_type = _json_to_pfb_types[type(value)] + if isinstance(old_type, list): + old_type = old_type[1] + assert old_type == new_type, (old_type, value) + + +def _new_replica_schema(name, value): + if value is None: + return 'null' + elif isinstance(value, list): + schema = {'type': 'array'} + # The `items` field will be absent from the schema if we never observe + # a nonempty array + for v in value: + _update_replica_schema_recursive(schema, 'items', name, v) + return schema + elif isinstance(value, dict): + return { + 'name': name, + 'type': 'record', + 'fields': [ + { + 'name': k, + 'type': _new_replica_schema(k, v) + } + for k, v in value.items() + ] + } + else: + return _json_to_pfb_types[type(value)] diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index 6830ab878..3d01bfcbd 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -2063,7 +2063,7 @@ def _all_replicas(self) -> Iterable[JSON]: replica_id = replica.meta.id if replica_id not in emitted_replica_ids: num_new_replicas += 1 - yield replica.contents.to_dict() + yield replica.to_dict() # Note that this will be zero for replicas that use implicit # hubs, in which case there are actually many hubs explicit_hub_count = len(replica.hub_ids) @@ -2109,6 +2109,36 @@ def create_file(self) -> tuple[str, Optional[str]]: os.close(fd) with open(path, 'w') as f: for replica in self._all_replicas(): - json.dump(replica, f) + json.dump(replica['contents'], f) f.write('\n') return path, None + + +class PFBVerbatimManifestGenerator(VerbatimManifestGenerator): + + @property + def content_type(self) -> str: + return 'application/octet-stream' + + @classmethod + def file_name_extension(cls): + return 'avro' + + @classmethod + def format(cls) -> ManifestFormat: + return ManifestFormat.verbatim_pfb + + def create_file(self) -> tuple[str, Optional[str]]: + replicas = list(self._all_replicas()) + replica_types, pfb_schema = avro_pfb.pfb_schema_from_replicas(replicas) + pfb_metadata_entity = avro_pfb.pfb_metadata_entity(replica_types) + + def pfb_entities(): + yield pfb_metadata_entity + for replica in replicas: + yield avro_pfb.PFBEntity.for_replica(dict(replica), pfb_schema).to_json(()) + + fd, path = mkstemp(suffix=f'.{self.file_name_extension()}') + os.close(fd) + avro_pfb.write_pfb_entities(pfb_entities(), pfb_schema, path) + return path, None