From 39765a45d10987c5450223100273c4a648eb5762 Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Tue, 12 Mar 2024 19:45:25 -0700 Subject: [PATCH] Add JSONL-based verbatim manifest format (#6028) --- lambdas/service/app.py | 8 +- lambdas/service/openapi.json | 12 ++- src/azul/plugins/__init__.py | 13 +++ src/azul/plugins/metadata/anvil/__init__.py | 14 ++- src/azul/plugins/metadata/hca/__init__.py | 8 ++ src/azul/service/elasticsearch_service.py | 2 +- src/azul/service/manifest_service.py | 107 ++++++++++++++++++++ test/integration_test.py | 12 ++- test/service/test_manifest.py | 26 +++++ 9 files changed, 193 insertions(+), 9 deletions(-) diff --git a/lambdas/service/app.py b/lambdas/service/app.py index 1495d0abab..e08bd22b70 100644 --- a/lambdas/service/app.py +++ b/lambdas/service/app.py @@ -228,7 +228,7 @@ # changes and reset the minor version to zero. Otherwise, increment only # the minor version for backwards compatible changes. A backwards # compatible change is one that does not require updates to clients. - 'version': '4.0' + 'version': '4.1' }, 'tags': [ { @@ -1398,6 +1398,10 @@ def manifest_route(*, fetch: bool, initiate: bool): file][4] manifest. This manifest can be used with the curl program to download all the files listed in the manifest. + - `{ManifestFormat.verbatim_jsonl.value}` for a verbatim + manifest in [JSONL][5] format. Each line contains an + unaltered metadata entity from the underlying repository. + [1]: https://bd2k.ini.usc.edu/tools/bdbag/ [2]: https://software.broadinstitute.org/firecloud/documentation/article?id=10954 @@ -1405,6 +1409,8 @@ def manifest_route(*, fetch: bool, initiate: bool): [3]: https://github.com/uc-cdis/pypfb [4]: https://curl.haxx.se/docs/manpage.html#-K + + [5]: https://jsonlines.org/ ''' ) ] if initiate else [], diff --git a/lambdas/service/openapi.json b/lambdas/service/openapi.json index 80221b3e94..e826453cc6 100644 --- a/lambdas/service/openapi.json +++ b/lambdas/service/openapi.json @@ -3,7 +3,7 @@ "info": { "title": "azul_service", "description": "\n# Overview\n\nAzul is a REST web service for querying metadata associated with\nboth experimental and analysis data from a data repository. In order\nto deliver response times that make it suitable for interactive use\ncases, the set of metadata properties that it exposes for sorting,\nfiltering, and aggregation is limited. Azul provides a uniform view\nof the metadata over a range of diverse schemas, effectively\nshielding clients from changes in the schemas as they occur over\ntime. It does so, however, at the expense of detail in the set of\nmetadata properties it exposes and in the accuracy with which it\naggregates them.\n\nAzul denormalizes and aggregates metadata into several different\nindices for selected entity types. Metadata entities can be queried\nusing the [Index](#operations-tag-Index) endpoints.\n\nA set of indices forms a catalog. There is a default catalog called\n`dcp2` which will be used unless a\ndifferent catalog name is specified using the `catalog` query\nparameter. Metadata from different catalogs is completely\nindependent: a response obtained by querying one catalog does not\nnecessarily correlate to a response obtained by querying another\none. Two catalogs can contain metadata from the same sources or\ndifferent sources. It is only guaranteed that the body of a\nresponse by any given endpoint adheres to one schema,\nindependently of which catalog was specified in the request.\n\nAzul provides the ability to download data and metadata via the\n[Manifests](#operations-tag-Manifests) endpoints. The\n`curl` format manifests can be used to\ndownload data files. Other formats provide various views of the\nmetadata. Manifests can be generated for a selection of files using\nfilters. These filters are interchangeable with the filters used by\nthe [Index](#operations-tag-Index) endpoints.\n\nAzul also provides a [summary](#operations-Index-get_index_summary)\nview of indexed data.\n\n## Data model\n\nAny index, when queried, returns a JSON array of hits. Each hit\nrepresents a metadata entity. Nested in each hit is a summary of the\nproperties of entities associated with the hit. An entity is\nassociated either by a direct edge in the original metadata graph,\nor indirectly as a series of edges. The nested properties are\ngrouped by the type of the associated entity. The properties of all\ndata files associated with a particular sample, for example, are\nlisted under `hits[*].files` in a `/index/samples` response. It is\nimportant to note that while each _hit_ represents a discrete\nentity, the properties nested within that hit are the result of an\naggregation over potentially many associated entities.\n\nTo illustrate this, consider a data file that is part of two\nprojects (a project is a group of related experiments, typically by\none laboratory, institution or consortium). Querying the `files`\nindex for this file yields a hit looking something like:\n\n```\n{\n \"projects\": [\n {\n \"projectTitle\": \"Project One\"\n \"laboratory\": ...,\n ...\n },\n {\n \"projectTitle\": \"Project Two\"\n \"laboratory\": ...,\n ...\n }\n ],\n \"files\": [\n {\n \"format\": \"pdf\",\n \"name\": \"Team description.pdf\",\n ...\n }\n ]\n}\n```\n\nThis example hit contains two kinds of nested entities (a hit in an\nactual Azul response will contain more): There are the two projects\nentities, and the file itself. These nested entities contain\nselected metadata properties extracted in a consistent way. This\nmakes filtering and sorting simple.\n\nAlso notice that there is only one file. When querying a particular\nindex, the corresponding entity will always be a singleton like\nthis.\n", - "version": "4.0" + "version": "4.1" }, "tags": [ { @@ -9478,10 +9478,11 @@ "compact", "terra.bdbag", "terra.pfb", - "curl" + "curl", + "verbatim.jsonl" ] }, - "description": "\nThe desired format of the output.\n\n- `compact` (the default) for a compact,\n tab-separated manifest\n\n- `terra.bdbag` for a manifest in the\n [BDBag format][1]. This provides a ZIP file containing two\n manifests: one for Participants (aka Donors) and one for\n Samples (aka Specimens). For more on the format of the\n manifests see [documentation here][2].\n\n- `terra.pfb` for a manifest in the [PFB\n format][3]. This format is mainly used for exporting data to\n Terra.\n\n- `curl` for a [curl configuration\n file][4] manifest. This manifest can be used with the curl\n program to download all the files listed in the manifest.\n\n[1]: https://bd2k.ini.usc.edu/tools/bdbag/\n\n[2]: https://software.broadinstitute.org/firecloud/documentation/article?id=10954\n\n[3]: https://github.com/uc-cdis/pypfb\n\n[4]: https://curl.haxx.se/docs/manpage.html#-K\n" + "description": "\nThe desired format of the output.\n\n- `compact` (the default) for a compact,\n tab-separated manifest\n\n- `terra.bdbag` for a manifest in the\n [BDBag format][1]. This provides a ZIP file containing two\n manifests: one for Participants (aka Donors) and one for\n Samples (aka Specimens). For more on the format of the\n manifests see [documentation here][2].\n\n- `terra.pfb` for a manifest in the [PFB\n format][3]. This format is mainly used for exporting data to\n Terra.\n\n- `curl` for a [curl configuration\n file][4] manifest. This manifest can be used with the curl\n program to download all the files listed in the manifest.\n\n- `verbatim.jsonl` for a verbatim\n manifest in [JSONL][5] format. Each line contains an\n unaltered metadata entity from the underlying repository.\n\n[1]: https://bd2k.ini.usc.edu/tools/bdbag/\n\n[2]: https://software.broadinstitute.org/firecloud/documentation/article?id=10954\n\n[3]: https://github.com/uc-cdis/pypfb\n\n[4]: https://curl.haxx.se/docs/manpage.html#-K\n\n[5]: https://jsonlines.org/\n" } ], "responses": { @@ -10885,10 +10886,11 @@ "compact", "terra.bdbag", "terra.pfb", - "curl" + "curl", + "verbatim.jsonl" ] }, - "description": "\nThe desired format of the output.\n\n- `compact` (the default) for a compact,\n tab-separated manifest\n\n- `terra.bdbag` for a manifest in the\n [BDBag format][1]. This provides a ZIP file containing two\n manifests: one for Participants (aka Donors) and one for\n Samples (aka Specimens). For more on the format of the\n manifests see [documentation here][2].\n\n- `terra.pfb` for a manifest in the [PFB\n format][3]. This format is mainly used for exporting data to\n Terra.\n\n- `curl` for a [curl configuration\n file][4] manifest. This manifest can be used with the curl\n program to download all the files listed in the manifest.\n\n[1]: https://bd2k.ini.usc.edu/tools/bdbag/\n\n[2]: https://software.broadinstitute.org/firecloud/documentation/article?id=10954\n\n[3]: https://github.com/uc-cdis/pypfb\n\n[4]: https://curl.haxx.se/docs/manpage.html#-K\n" + "description": "\nThe desired format of the output.\n\n- `compact` (the default) for a compact,\n tab-separated manifest\n\n- `terra.bdbag` for a manifest in the\n [BDBag format][1]. This provides a ZIP file containing two\n manifests: one for Participants (aka Donors) and one for\n Samples (aka Specimens). For more on the format of the\n manifests see [documentation here][2].\n\n- `terra.pfb` for a manifest in the [PFB\n format][3]. This format is mainly used for exporting data to\n Terra.\n\n- `curl` for a [curl configuration\n file][4] manifest. This manifest can be used with the curl\n program to download all the files listed in the manifest.\n\n- `verbatim.jsonl` for a verbatim\n manifest in [JSONL][5] format. Each line contains an\n unaltered metadata entity from the underlying repository.\n\n[1]: https://bd2k.ini.usc.edu/tools/bdbag/\n\n[2]: https://software.broadinstitute.org/firecloud/documentation/article?id=10954\n\n[3]: https://github.com/uc-cdis/pypfb\n\n[4]: https://curl.haxx.se/docs/manpage.html#-K\n\n[5]: https://jsonlines.org/\n" } ], "responses": { diff --git a/src/azul/plugins/__init__.py b/src/azul/plugins/__init__.py index a9c38c06f2..212f4d6733 100644 --- a/src/azul/plugins/__init__.py +++ b/src/azul/plugins/__init__.py @@ -133,6 +133,7 @@ class ManifestFormat(Enum): terra_bdbag = 'terra.bdbag' terra_pfb = 'terra.pfb' curl = 'curl' + verbatim_jsonl = 'verbatim.jsonl' T = TypeVar('T', bound='Plugin') @@ -403,6 +404,18 @@ def _field_mapping(self) -> _FieldMapping: def source_id_field(self) -> str: raise NotImplementedError + @property + @abstractmethod + def implicit_hub_type(self) -> str: + """ + The type of entities that do not explicitly track their hubs in replica + documents in order to avoid a large list of hub references in the + replica document, and to avoid contention when updating that list during + indexing. Note that this is not a type of hub entities, but rather the + type of replica entities that have implicit hubs. + """ + raise NotImplementedError + @property def facets(self) -> Sequence[str]: return [ diff --git a/src/azul/plugins/metadata/anvil/__init__.py b/src/azul/plugins/metadata/anvil/__init__.py index 9fbd8e1cd4..6b9121402a 100644 --- a/src/azul/plugins/metadata/anvil/__init__.py +++ b/src/azul/plugins/metadata/anvil/__init__.py @@ -5,6 +5,10 @@ Type, ) +from azul import ( + config, + iif, +) from azul.indexer.document import ( DocumentType, EntityType, @@ -63,7 +67,11 @@ def exposed_indices(self) -> dict[EntityType, Sorting]: @property def manifest_formats(self) -> Sequence[ManifestFormat]: - return [ManifestFormat.compact, ManifestFormat.terra_pfb] + return [ + ManifestFormat.compact, + ManifestFormat.terra_pfb, + *iif(config.enable_replicas, [ManifestFormat.verbatim_jsonl]) + ] def transformer_types(self) -> Iterable[Type[BaseTransformer]]: return ( @@ -215,6 +223,10 @@ def _field_mapping(self) -> MetadataPlugin._FieldMapping: def source_id_field(self) -> str: return 'sourceId' + @property + def implicit_hub_type(self) -> str: + return 'datasets' + @property def facets(self) -> Sequence[str]: return [ diff --git a/src/azul/plugins/metadata/hca/__init__.py b/src/azul/plugins/metadata/hca/__init__.py index 8d7debdd08..6338314c7c 100644 --- a/src/azul/plugins/metadata/hca/__init__.py +++ b/src/azul/plugins/metadata/hca/__init__.py @@ -8,6 +8,9 @@ Type, ) +from azul import ( + config, +) from azul.indexer.document import ( Aggregate, DocumentType, @@ -160,6 +163,7 @@ def manifest_formats(self) -> Sequence[ManifestFormat]: ManifestFormat.terra_bdbag, ManifestFormat.terra_pfb, ManifestFormat.curl, + *([ManifestFormat.verbatim_jsonl] if config.enable_replicas else []) ] @property @@ -267,6 +271,10 @@ def _field_mapping(self) -> MetadataPlugin._FieldMapping: def source_id_field(self) -> str: return 'sourceId' + @property + def implicit_hub_type(self) -> str: + return 'projects' + @property def facets(self) -> Sequence[str]: return [ diff --git a/src/azul/service/elasticsearch_service.py b/src/azul/service/elasticsearch_service.py index b08f102cb8..b26e5bdbcb 100644 --- a/src/azul/service/elasticsearch_service.py +++ b/src/azul/service/elasticsearch_service.py @@ -668,7 +668,7 @@ def create_request(self, ) -> Search: """ Create an Elasticsearch request against the index containing documents - for the given entity+document type in the given catalog. + of the given entity and document types, in the given catalog. """ return Search(using=self._es_client, index=str(IndexName.create(catalog=catalog, diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index 8a3c2786ca..288e58561f 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -34,6 +34,7 @@ from itertools import ( chain, ) +import json import logging from math import ( ceil, @@ -69,6 +70,7 @@ bdbag_api, ) from elasticsearch_dsl import ( + Q, Search, ) from elasticsearch_dsl.response import ( @@ -78,6 +80,7 @@ furl, ) from more_itertools import ( + chunked, one, ) import msgpack @@ -107,6 +110,7 @@ aws, ) from azul.indexer.document import ( + DocumentType, FieldPath, FieldTypes, null_str, @@ -2002,3 +2006,106 @@ def qualify(qualifier, column_name, index=None): # Join concatenated values using the joiner row = {k: self.padded_joiner.join(sorted(v)) if isinstance(v, set) else v for k, v in row.items()} bundle_tsv_writer.writerow(row) + + +class VerbatimManifestGenerator(FileBasedManifestGenerator): + + @property + def content_type(self) -> str: + return 'application/jsonl' + + @classmethod + def file_name_extension(cls) -> str: + return 'jsonl' + + @classmethod + def format(cls) -> ManifestFormat: + return ManifestFormat.verbatim_jsonl + + @property + def entity_type(self) -> str: + return 'files' + + @property + def included_fields(self) -> list[FieldPath]: + # This is only used when searching the aggregates, which are only used + # to perform a "join" on the replicas index. Therefore, we only need the + # "keys" used for the join. + return [ + ('entity_id',), + ('contents', self.implicit_hub_type, 'document_id') + ] + + @property + def implicit_hub_type(self) -> str: + return self.service.metadata_plugin(self.catalog).implicit_hub_type + + @attrs.frozen(kw_only=True) + class ReplicaKeys: + """ + Most replicas contain a list of the entity ID of their hubs, usually + file entities. However, some low-cardinality entities like HCA projects + have too many hubs to track within their replica document. + + This class captures the information needed to locate all replicas + associated with a given a hub entity, either using the hub's entity ID + or the replica's entity ID. + """ + hub_id: str + replica_id: str + + def _replica_keys(self) -> Iterable[ReplicaKeys]: + hub_type = self.implicit_hub_type + request = self._create_request() + for hit in request.scan(): + yield self.ReplicaKeys(hub_id=hit['entity_id'], + replica_id=one(one(hit['contents'][hub_type])['document_id'])) + + def _all_replicas(self) -> Iterable[JSON]: + emitted_replica_ids = set() + page_size = 100 + for page in chunked(self._replica_keys(), page_size): + num_replicas = 0 + num_new_replicas = 0 + for replica in self._join_replicas(page): + num_replicas += 1 + # A single replica may have many hubs. To prevent replicas from + # being emitted more than once, we need to keep track of + # replicas already emitted. + replica_id = replica.meta.id + if replica_id not in emitted_replica_ids: + num_new_replicas += 1 + yield replica.contents.to_dict() + # Note that this will be zero for replicas that use implicit + # hubs, in which case there are actually many hubs + explicit_hub_count = len(replica.hub_ids) + # We don't have to track the IDs of replicas with only one + # hub, since we know that there are no other hubs that could + # cause their re-emission. + if explicit_hub_count != 1: + emitted_replica_ids.add(replica_id) + log.info('Found %d replicas (%d already emitted) from page of %d hubs', + num_replicas, num_replicas - num_new_replicas, len(page)) + + def _join_replicas(self, keys: Iterable[ReplicaKeys]) -> Iterable[Hit]: + request = self.service.create_request(catalog=self.catalog, + entity_type='replica', + doc_type=DocumentType.replica) + hub_ids, replica_ids = set(), set() + for key in keys: + hub_ids.add(key.hub_id) + replica_ids.add(key.replica_id) + request = request.query(Q('bool', should=[ + {'terms': {'hub_ids.keyword': list(hub_ids)}}, + {'terms': {'entity_id.keyword': list(replica_ids)}} + ])) + return request.scan() + + def create_file(self) -> tuple[str, Optional[str]]: + fd, path = mkstemp(suffix=f'.{self.file_name_extension()}') + os.close(fd) + with open(path, 'w') as f: + for replica in self._all_replicas(): + json.dump(replica, f) + f.write('\n') + return path, None diff --git a/test/integration_test.py b/test/integration_test.py index e8848725c6..e01efbc92c 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -583,7 +583,8 @@ def _test_manifest(self, catalog: CatalogName): ManifestFormat.compact: self._check_compact_manifest, ManifestFormat.terra_bdbag: self._check_terra_bdbag_manifest, ManifestFormat.terra_pfb: self._check_terra_pfb_manifest, - ManifestFormat.curl: self._check_curl_manifest + ManifestFormat.curl: self._check_curl_manifest, + ManifestFormat.verbatim_jsonl: self._check_jsonl_manifest } for format in [None, *supported_formats]: # IT catalogs with just one public source are always indexed @@ -1000,6 +1001,15 @@ def _check_curl_manifest(self, _catalog: CatalogName, response: bytes): log.info(f'Manifest contains {num_files} files.') self.assertGreater(num_files, 0) + def _check_jsonl_manifest(self, _catalog: CatalogName, response: bytes): + text = TextIOWrapper(BytesIO(response)) + num_replicas = 0 + for line in text: + json.loads(line) + num_replicas += 1 + log.info('Manifest contains %d replicas', num_replicas) + self.assertGreater(num_replicas, 0) + def _test_repository_files(self, catalog: str): with self.subTest('repository_files', catalog=catalog): file = self._get_one_inner_file(catalog) diff --git a/test/service/test_manifest.py b/test/service/test_manifest.py index 0a52a4d7e2..4e7736e960 100644 --- a/test/service/test_manifest.py +++ b/test/service/test_manifest.py @@ -1273,6 +1273,32 @@ def test_manifest_content_disposition_header(self): actual_cd = query.params['response-content-disposition'] self.assertEqual(expected_cd, actual_cd) + @unittest.skipIf(not config.enable_replicas, + 'The format is replica-based') + @manifest_test + def test_verbatim_jsonl_manifest(self): + bundle = self._load_canned_bundle(one(self.bundles())) + expected = [ + bundle.metadata_files[d] + for d in [ + 'cell_suspension_0.json', + 'project_0.json', + 'sequence_file_0.json', + 'sequence_file_1.json', + 'specimen_from_organism_0.json' + ] + ] + response = self._get_manifest(ManifestFormat.verbatim_jsonl, {}) + self.assertEqual(200, response.status_code) + response = list(map(json.loads, response.content.decode().splitlines())) + + def sort_key(hca_doc: JSON) -> str: + return hca_doc['provenance']['document_id'] + + expected.sort(key=sort_key) + response.sort(key=sort_key) + self.assertEqual(expected, response) + class TestManifestCache(ManifestTestCase):