From 5c6fe2abd1803f2b0025c7202f08255b7eeec30f Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Wed, 17 Apr 2024 19:30:50 -0700 Subject: [PATCH] Refactor and document PFB integration test --- test/integration_test.py | 68 ++++++++++++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 9 deletions(-) diff --git a/test/integration_test.py b/test/integration_test.py index e9da70ae7..cb2834f0b 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -21,6 +21,7 @@ TextIOWrapper, ) from itertools import ( + count, starmap, ) import json @@ -954,17 +955,66 @@ def _check_terra_bdbag_manifest(self, catalog: CatalogName, response: bytes): self.assertEqual(size, int(response.headers['Content-Length'])) def _check_terra_pfb_manifest(self, _catalog: CatalogName, response: bytes): + # A PFB is an Avro Object Container File, i.e., a stream of Avro objects + # preceded by a schema describing these objects. The internals of the + # format are slightly more complicated and are described in + # + # https://avro.apache.org/docs/1.11.1/specification/#object-container-files + # reader = fastavro.reader(BytesIO(response)) + # The schema is also an Avro object, specifically a Avro record which + # FastAVRO exposes to us as a JSON object, i.e., a `dict` with string + # keys + record_schema = reader.writer_schema + # Each object in a PFB is also of type 'record' + self.assertEqual('record', record_schema['type']) + # PFB calls the records *entities*. Unfortunately, the PFB standard is + # afflicted with confusing terminology, so bear with us. + self.assertEqual('Entity', record_schema['name']) + # Each entity record has four fields: `id`, `name`, `object` and + # `relations`. The `object` field holds the actual entity. The `name` + # field, is a string denoting the type of entity. Entities records with + # the same value in the `name` field are expected to contain entities of + # the same shape. Here we extract the declaration of the `object` field + # from the schema: + object_field = one(f for f in record_schema['fields'] if f['name'] == 'object') + # The different shapes, i.e., entity types are defined as members of a + # union type, which manifests in Avro simply as an array of schemas. + # Here we extract each union member and index it into a dictionary for + # easy access by name. + entity_types = {e['name']: e for e in object_field['type']} + self.assertEqual(len(entity_types), len(object_field['type'])) + # The `id` field is a string uniquely identifying an entity among all + # entities of the same shape, i.e., with the same value in the `name` + # field of the containing record. The `relations` field holds references + # to other entities, as an array of nested Avro records, each record + # containing the `name` and `id` of the referenced entity. + num_records = count() for record in reader: - fastavro.validate(record, reader.writer_schema) - object_schema = one(f for f in reader.writer_schema['fields'] - if f['name'] == 'object') - entity_schema = one(e for e in object_schema['type'] - if e['name'] == record['name']) - fields = entity_schema['fields'] - rows_present = set(record['object'].keys()) - rows_expected = set(f['name'] for f in fields) - self.assertEqual(rows_present, rows_expected) + # Every record must follow the schema. Since each record's `object` + # field contains an entity, the schema check therefore extends to + # the various entity types. + fastavro.validate(record, record_schema) + if 0 == next(num_records): + # PFB requires a special `Metadata` entity to occur first. It is + # used to declare the relations between entity types, thereby + # expressing additional constraints on the `relations` field. + # + # FIXME: We don't currently declare relations + # https://github.com/DataBiosphere/azul/issues/6066 + # + # For now, we just check the `name` and the absence of an `id`. + self.assertEqual('Metadata', record['name']) + self.assertIsNone(record['id']) + # The following is redundant given the schema validation above but + # we'll leave it in for illustration. + fields = entity_types[record['name']]['fields'] + fields_present = set(record['object'].keys()) + fields_expected = set(f['name'] for f in fields) + self.assertEqual(fields_present, fields_expected) + # We expect to observe the special `Metadata` entity record and at least + # one additional entity record + self.assertGreater(next(num_records), 1) def _read_csv_manifest(self, file: IO[bytes]) -> csv.DictReader: text = TextIOWrapper(file)