From 7834873eed57be536f46a29d7d0cd71b765496cd Mon Sep 17 00:00:00 2001 From: Noa Aviel Dove Date: Tue, 31 Oct 2023 17:05:11 -0700 Subject: [PATCH] [a r] Index dataset description from Terra API (#5547) --- deployments/anvilbox/environment.py | 1 + deployments/anvildev/environment.py | 1 + environment.py | 5 ++ lambdas/service/app.py | 2 +- lambdas/service/openapi.json | 2 +- src/azul/__init__.py | 5 ++ .../metadata/anvil/indexer/transform.py | 39 +++++++--- .../plugins/repository/tdr_anvil/__init__.py | 76 +++++++++++++++---- src/azul/terra.py | 10 +++ test/indexer/__init__.py | 17 +++++ ...2783-aeb6-afea-e022897f4dcf.tdr.anvil.json | 8 ++ test/indexer/test_anvil.py | 62 ++++++++++++++- test/indexer/test_indexer.py | 5 -- test/integration_test.py | 13 ++-- 14 files changed, 205 insertions(+), 41 deletions(-) create mode 100644 test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json diff --git a/deployments/anvilbox/environment.py b/deployments/anvilbox/environment.py index ee6555aeab..e5e3c15a4d 100644 --- a/deployments/anvilbox/environment.py +++ b/deployments/anvilbox/environment.py @@ -141,6 +141,7 @@ def env() -> Mapping[str, Optional[str]]: 'AZUL_TDR_SOURCE_LOCATION': 'us-central1', 'AZUL_TDR_SERVICE_URL': 'https://jade.datarepo-dev.broadinstitute.org', 'AZUL_SAM_SERVICE_URL': 'https://sam.dsde-dev.broadinstitute.org', + 'AZUL_DUOS_SERVICE_URL': 'https://consent.dsde-dev.broadinstitute.org', **( { diff --git a/deployments/anvildev/environment.py b/deployments/anvildev/environment.py index fa599604aa..4a66afcbb9 100644 --- a/deployments/anvildev/environment.py +++ b/deployments/anvildev/environment.py @@ -115,6 +115,7 @@ def env() -> Mapping[str, Optional[str]]: 'AZUL_TDR_SOURCE_LOCATION': 'us-central1', 'AZUL_TDR_SERVICE_URL': 'https://jade.datarepo-dev.broadinstitute.org', 'AZUL_SAM_SERVICE_URL': 'https://sam.dsde-dev.broadinstitute.org', + 'AZUL_DUOS_SERVICE_URL': 'https://consent.dsde-dev.broadinstitute.org', 'AZUL_ENABLE_MONITORING': '1', diff --git a/environment.py b/environment.py index 1bbb5dfd03..6185fcf082 100644 --- a/environment.py +++ b/environment.py @@ -618,6 +618,11 @@ def env() -> Mapping[str, Optional[str]]: # 'AZUL_SAM_SERVICE_URL': None, + # The URL of Terra's DUOS service from which to index descriptions of + # AnVIL datasets. If left unset, this step is skipped during indexing. + # + 'AZUL_DUOS_SERVICE_URL': None, + # OAuth2 Client ID to be used for authenticating users. See section # 3.2 of the README # diff --git a/lambdas/service/app.py b/lambdas/service/app.py index e1f2c136f8..354c906f9f 100644 --- a/lambdas/service/app.py +++ b/lambdas/service/app.py @@ -227,7 +227,7 @@ # changes and reset the minor version to zero. Otherwise, increment only # the minor version for backwards compatible changes. A backwards # compatible change is one that does not require updates to clients. - 'version': '2.0' + 'version': '2.1' }, 'tags': [ { diff --git a/lambdas/service/openapi.json b/lambdas/service/openapi.json index 6dc25db49f..6148675ceb 100644 --- a/lambdas/service/openapi.json +++ b/lambdas/service/openapi.json @@ -3,7 +3,7 @@ "info": { "title": "azul_service", "description": "\n# Overview\n\nAzul is a REST web service for querying metadata associated with\nboth experimental and analysis data from a data repository. In order\nto deliver response times that make it suitable for interactive use\ncases, the set of metadata properties that it exposes for sorting,\nfiltering, and aggregation is limited. Azul provides a uniform view\nof the metadata over a range of diverse schemas, effectively\nshielding clients from changes in the schemas as they occur over\ntime. It does so, however, at the expense of detail in the set of\nmetadata properties it exposes and in the accuracy with which it\naggregates them.\n\nAzul denormalizes and aggregates metadata into several different\nindices for selected entity types. Metadata entities can be queried\nusing the [Index](#operations-tag-Index) endpoints.\n\nA set of indices forms a catalog. There is a default catalog called\n`dcp2` which will be used unless a\ndifferent catalog name is specified using the `catalog` query\nparameter. Metadata from different catalogs is completely\nindependent: a response obtained by querying one catalog does not\nnecessarily correlate to a response obtained by querying another\none. Two catalogs can contain metadata from the same sources or\ndifferent sources. It is only guaranteed that the body of a\nresponse by any given endpoint adheres to one schema,\nindependently of which catalog was specified in the request.\n\nAzul provides the ability to download data and metadata via the\n[Manifests](#operations-tag-Manifests) endpoints. The\n`curl` format manifests can be used to\ndownload data files. Other formats provide various views of the\nmetadata. Manifests can be generated for a selection of files using\nfilters. These filters are interchangeable with the filters used by\nthe [Index](#operations-tag-Index) endpoints.\n\nAzul also provides a [summary](#operations-Index-get_index_summary)\nview of indexed data.\n\n## Data model\n\nAny index, when queried, returns a JSON array of hits. Each hit\nrepresents a metadata entity. Nested in each hit is a summary of the\nproperties of entities associated with the hit. An entity is\nassociated either by a direct edge in the original metadata graph,\nor indirectly as a series of edges. The nested properties are\ngrouped by the type of the associated entity. The properties of all\ndata files associated with a particular sample, for example, are\nlisted under `hits[*].files` in a `/index/samples` response. It is\nimportant to note that while each _hit_ represents a discrete\nentity, the properties nested within that hit are the result of an\naggregation over potentially many associated entities.\n\nTo illustrate this, consider a data file that is part of two\nprojects (a project is a group of related experiments, typically by\none laboratory, institution or consortium). Querying the `files`\nindex for this file yields a hit looking something like:\n\n```\n{\n \"projects\": [\n {\n \"projectTitle\": \"Project One\"\n \"laboratory\": ...,\n ...\n },\n {\n \"projectTitle\": \"Project Two\"\n \"laboratory\": ...,\n ...\n }\n ],\n \"files\": [\n {\n \"format\": \"pdf\",\n \"name\": \"Team description.pdf\",\n ...\n }\n ]\n}\n```\n\nThis example hit contains two kinds of nested entities (a hit in an\nactual Azul response will contain more): There are the two projects\nentities, and the file itself. These nested entities contain\nselected metadata properties extracted in a consistent way. This\nmakes filtering and sorting simple.\n\nAlso notice that there is only one file. When querying a particular\nindex, the corresponding entity will always be a singleton like\nthis.\n", - "version": "2.0" + "version": "2.1" }, "tags": [ { diff --git a/src/azul/__init__.py b/src/azul/__init__.py index d91abc9dde..85d46f9de4 100644 --- a/src/azul/__init__.py +++ b/src/azul/__init__.py @@ -322,6 +322,11 @@ def tdr_service_url(self) -> mutable_furl: def sam_service_url(self) -> mutable_furl: return furl(self.environ['AZUL_SAM_SERVICE_URL']) + @property + def duos_service_url(self) -> Optional[mutable_furl]: + url = self.environ.get('AZUL_DUOS_SERVICE_URL') + return None if url is None else furl(url) + @property def dss_query_prefix(self) -> str: return self.environ.get('AZUL_DSS_QUERY_PREFIX', '') diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py index b21ab34f9c..a57b85a65b 100644 --- a/src/azul/plugins/metadata/anvil/indexer/transform.py +++ b/src/azul/plugins/metadata/anvil/indexer/transform.py @@ -133,7 +133,7 @@ def field_types(cls) -> FieldTypes: return { 'activities': cls._activity_types(), 'biosamples': cls._biosample_types(), - 'datasets': cls._dataset_types(), + 'datasets': {**cls._dataset_types(), **cls._duos_types()}, 'diagnoses': cls._diagnosis_types(), 'donors': cls._donor_types(), 'files': cls._aggregate_file_types(), @@ -229,6 +229,13 @@ def _biosample_types(cls) -> FieldTypes: 'donor_age_at_collection': pass_thru_json, } + @classmethod + def _duos_types(cls) -> FieldTypes: + return { + 'document_id': null_str, + 'description': null_str, + } + @classmethod def _dataset_types(cls) -> FieldTypes: return { @@ -378,6 +385,9 @@ def _biosample(self, biosample: EntityReference) -> MutableJSON: def _dataset(self, dataset: EntityReference) -> MutableJSON: return self._entity(dataset, self._dataset_types()) + def _duos(self, dataset: EntityReference) -> MutableJSON: + return self._entity(dataset, self._duos_types()) + def _diagnosis(self, diagnosis: EntityReference) -> MutableJSON: return self._entity(diagnosis, self._diagnosis_types(), @@ -469,17 +479,22 @@ def entity_type(cls) -> str: return 'datasets' def _transform(self, entity: EntityReference) -> Contribution: - contents = dict( - activities=self._entities(self._activity, chain.from_iterable( - self._entities_by_type[activity_type] - for activity_type in self._activity_polymorphic_types - )), - biosamples=self._entities(self._biosample, self._entities_by_type['biosample']), - datasets=[self._dataset(entity)], - diagnoses=self._entities(self._diagnosis, self._entities_by_type['diagnosis']), - donors=self._entities(self._donor, self._entities_by_type['donor']), - files=self._entities(self._file, self._entities_by_type['file']), - ) + try: + dataset = self._dataset(entity) + except KeyError: + contents = dict(datasets=[self._duos(entity)]) + else: + contents = dict( + activities=self._entities(self._activity, chain.from_iterable( + self._entities_by_type[activity_type] + for activity_type in self._activity_polymorphic_types + )), + biosamples=self._entities(self._biosample, self._entities_by_type['biosample']), + datasets=[dataset], + diagnoses=self._entities(self._diagnosis, self._entities_by_type['diagnosis']), + donors=self._entities(self._donor, self._entities_by_type['donor']), + files=self._entities(self._file, self._entities_by_type['file']), + ) return self._contribution(contents, entity) diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index a5b8218c1f..e0114f1973 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -60,6 +60,7 @@ MutableJSONs, ) from azul.uuids import ( + change_version, validate_uuid_prefix, ) @@ -98,9 +99,21 @@ class BundleEntityType(Enum): specifically tailored traversal implementation. Supplementary bundles always consist of exactly two entities: one file (the bundle entity) and one dataset. + + The `dataset.description` field is unusual in that it is not stored in + BigQuery and must be retrieved via Terra's DUOS API. There is only one + dataset per snapshot, which is referenced in all primary and supplementary + bundles. Therefore, only one request to DUOS per *snapshot* is necessary, + but if `description` is retrieved at the same time as the other dataset + fields, we will make one request per *bundle* instead, potentially + overloading the DUOS service. Our solution is to retrieve `description` only + in a dedicated bundle format, once per snapshot, and merge it with the other + dataset fields during aggregation. This bundle contains only a single + dataset entity with only the `description` field populated. """ primary: EntityType = 'biosample' supplementary: EntityType = 'file' + duos: EntityType = 'dataset' class AnvilBundleFQIDJSON(SourcedBundleFQIDJSON): @@ -178,7 +191,8 @@ def _list_bundles(self, validate_uuid_prefix(partition_prefix) primary = BundleEntityType.primary.value supplementary = BundleEntityType.supplementary.value - rows = self._run_sql(f''' + dataset = BundleEntityType.duos.value + rows = list(self._run_sql(f''' SELECT datarepo_row_id, {primary!r} AS entity_type FROM {backtick(self._full_table_name(spec, primary))} WHERE STARTS_WITH(datarepo_row_id, '{partition_prefix}') @@ -186,18 +200,37 @@ def _list_bundles(self, SELECT datarepo_row_id, {supplementary!r} AS entity_type FROM {backtick(self._full_table_name(spec, supplementary))} AS supp WHERE supp.is_supplementary AND STARTS_WITH(datarepo_row_id, '{partition_prefix}') - ''') - return [ - AnvilBundleFQID(source=source, - # Reversibly tweak the entity UUID to prevent - # collisions between entity IDs and bundle IDs - uuid=uuids.change_version(row['datarepo_row_id'], - self.datarepo_row_uuid_version, - self.bundle_uuid_version), - version=self._version, - entity_type=BundleEntityType(row['entity_type'])) - for row in rows - ] + ''' + ( + '' if config.duos_service_url is None else f''' + UNION ALL + SELECT datarepo_row_id, {dataset!r} AS entity_type + FROM {backtick(self._full_table_name(spec, dataset))} + ''' + ))) + bundles = [] + dataset_count = 0 + for row in rows: + # We intentionally omit the WHERE clause for datasets so that we can + # verify our assumption that each snapshot only contains rows for a + # single dataset. This verification is performed independently and + # concurrently for every partition, but only one partition actually + # emits the bundle. + if row['entity_type'] == dataset: + require(0 == dataset_count) + dataset_count += 1 + if not row['datarepo_row_id'].startswith(partition_prefix): + continue + bundles.append(AnvilBundleFQID( + source=source, + # Reversibly tweak the entity UUID to prevent + # collisions between entity IDs and bundle IDs + uuid=uuids.change_version(row['datarepo_row_id'], + self.datarepo_row_uuid_version, + self.bundle_uuid_version), + version=self._version, + entity_type=BundleEntityType(row['entity_type']) + )) + return bundles def list_partitions(self, source: TDRSourceRef @@ -250,6 +283,10 @@ def _emulate_bundle(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle: elif bundle_fqid.entity_type is BundleEntityType.supplementary: log.info('Bundle %r is a supplementary bundle', bundle_fqid.uuid) return self._supplementary_bundle(bundle_fqid) + elif bundle_fqid.entity_type is BundleEntityType.duos: + assert config.duos_service_url is not None, bundle_fqid + log.info('Bundle %r is a dataset description', bundle_fqid.uuid) + return self._dataset_description(bundle_fqid) else: assert False, bundle_fqid.entity_type @@ -334,6 +371,19 @@ def _supplementary_bundle(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle: result.add_links({Link(**link_args)}, entities_by_key) return result + def _dataset_description(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle: + description = self.tdr.get_duos(bundle_fqid.source)['studyDescription'] + entity_id = change_version(bundle_fqid.uuid, + self.bundle_uuid_version, + self.datarepo_row_uuid_version) + entity = EntityReference(entity_type=bundle_fqid.entity_type.value, + entity_id=entity_id) + bundle = TDRAnvilBundle(fqid=bundle_fqid) + bundle.add_entity(entity=entity, + version=self._version, + row={'description': description}) + return bundle + def _bundle_entity(self, bundle_fqid: AnvilBundleFQID) -> KeyReference: source = bundle_fqid.source bundle_uuid = bundle_fqid.uuid diff --git a/src/azul/terra.py b/src/azul/terra.py index 103025cb87..ed68f4e25f 100644 --- a/src/azul/terra.py +++ b/src/azul/terra.py @@ -555,6 +555,9 @@ def _job_info(self, job: QueryJob) -> JSON: def _repository_endpoint(self, *path: str) -> mutable_furl: return config.tdr_service_url.set(path=('api', 'repository', 'v1', *path)) + def _duos_endpoint(self, *path: str) -> mutable_furl: + return config.duos_service_url.set(path=('api', *path)) + def _check_response(self, endpoint: furl, response: urllib3.HTTPResponse @@ -659,3 +662,10 @@ def for_registered_user(cls, authentication: OAuth2) -> 'TDRClient': def drs_client(self) -> DRSClient: return DRSClient(http_client=self._http_client) + + def get_duos(self, source: SourceRef) -> MutableJSON: + response = self._retrieve_source(source) + duos_id = response['duosFirecloudGroup']['duosId'] + url = self._duos_endpoint('dataset', 'registration', duos_id) + response = self._request('GET', url) + return self._check_response(url, response) diff --git a/test/indexer/__init__.py b/test/indexer/__init__.py index 17c8dad040..b9f2ae61c3 100644 --- a/test/indexer/__init__.py +++ b/test/indexer/__init__.py @@ -19,6 +19,9 @@ from elasticsearch.helpers import ( scan, ) +from more_itertools import ( + one, +) from azul import ( CatalogName, @@ -30,6 +33,7 @@ SourcedBundleFQID, ) from azul.indexer.document import ( + DocumentType, IndexName, ) from azul.indexer.index_service import ( @@ -137,9 +141,22 @@ def _get_all_hits(self): index=','.join(self.index_service.index_names(self.catalog)), preserve_order=True)) for hit in hits: + entity_type, doc_type = self._parse_index_name(hit) + if ( + entity_type == 'datasets' + and doc_type is DocumentType.contribution + and 'description' in one(hit['_source']['contents']['datasets']) + ): + # Sparse dataset contributions contain no lists + continue self._verify_sorted_lists(hit['_source']) return hits + def _parse_index_name(self, hit) -> tuple[str, DocumentType]: + index_name = IndexName.parse(hit['_index']) + index_name.validate() + return index_name.entity_type, index_name.doc_type + def _load_canned_result(self, bundle_fqid: BundleFQID) -> MutableJSONs: """ Load the canned index documents for the given canned bundle and fix the diff --git a/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json b/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json new file mode 100644 index 0000000000..4a9831df0c --- /dev/null +++ b/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json @@ -0,0 +1,8 @@ +{ + "entities": { + "dataset/2370f948-2783-4eb6-afea-e022897f4dcf": { + "description": "Study description from DUOS" + } + }, + "links": [] +} \ No newline at end of file diff --git a/test/indexer/test_anvil.py b/test/indexer/test_anvil.py index 7627149361..03b1b965ba 100644 --- a/test/indexer/test_anvil.py +++ b/test/indexer/test_anvil.py @@ -1,6 +1,12 @@ +from collections import ( + defaultdict, +) from operator import ( itemgetter, ) +from typing import ( + cast, +) import unittest from more_itertools import ( @@ -11,13 +17,16 @@ CatalogName, config, ) -from azul.indexer import ( - SourcedBundleFQID, +from azul.indexer.document import ( + DocumentType, + EntityReference, ) from azul.logging import ( configure_test_logging, ) from azul.plugins.repository.tdr_anvil import ( + AnvilBundleFQID, + BundleEntityType, TDRAnvilBundle, ) from indexer import ( @@ -36,14 +45,26 @@ def setUpModule(): class AnvilIndexerTestCase(IndexerTestCase, TDRAnvilPluginTestCase): @classmethod - def bundles(cls) -> list[SourcedBundleFQID]: + def bundle_fqid(cls, + *, + uuid, + version, + entity_type=BundleEntityType.primary + ) -> AnvilBundleFQID: + return AnvilBundleFQID(source=cls.source, + uuid=uuid, + version=version, + entity_type=entity_type) + + @classmethod + def bundles(cls) -> list[AnvilBundleFQID]: return [ cls.bundle_fqid(uuid='826dea02-e274-affe-aabc-eb3db63ad068', version='foo') ] @property - def bundle(self) -> SourcedBundleFQID: + def bundle(self) -> AnvilBundleFQID: return one(self.bundles()) @classmethod @@ -76,6 +97,39 @@ def test_indexing(self): expected_hits = self._load_canned_result(self.bundle) self.assertEqual(expected_hits, hits) + def test_dataset_description(self): + dataset_ref = EntityReference(entity_type='dataset', + entity_id='2370f948-2783-4eb6-afea-e022897f4dcf') + dataset_bundle = self.bundle_fqid(uuid='2370f948-2783-aeb6-afea-e022897f4dcf', + version=self.bundle.version, + entity_type=BundleEntityType.duos) + + bundles = [self.bundle, dataset_bundle] + for bundle_fqid in bundles: + bundle = cast(TDRAnvilBundle, self._load_canned_bundle(bundle_fqid)) + bundle.links.clear() + bundle.entities = {dataset_ref: bundle.entities[dataset_ref]} + self._index_bundle(bundle, delete=False) + + hits = self._get_all_hits() + doc_counts: dict[DocumentType, int] = defaultdict(int) + for hit in hits: + entity_type, doc_type = self._parse_index_name(hit) + doc_counts[doc_type] += 1 + self.assertEqual('datasets', entity_type) + if doc_type is DocumentType.aggregate: + self.assertEqual(2, hit['_source']['num_contributions']) + self.assertEqual(sorted(b.uuid for b in bundles), + sorted(b['uuid'] for b in hit['_source']['bundles'])) + contents = one(hit['_source']['contents']['datasets']) + # These fields are populated only in the primary bundle + self.assertEqual(dataset_ref.entity_id, contents['document_id']) + self.assertEqual(['phs000693'], contents['registered_identifier']) + # This field is populated only in the sparse dataset bundle + self.assertEqual('Study description from DUOS', contents['description']) + self.assertEqual(1, doc_counts[DocumentType.aggregate]) + self.assertEqual(2, doc_counts[DocumentType.contribution]) + # FIXME: Enable test after the issue with TinyQuery `WITH` has been resolved # https://github.com/DataBiosphere/azul/issues/5046 @unittest.skip('TinyQuery does not support the WITH clause') diff --git a/test/indexer/test_indexer.py b/test/indexer/test_indexer.py index 7af796542e..aec29a6f30 100644 --- a/test/indexer/test_indexer.py +++ b/test/indexer/test_indexer.py @@ -216,11 +216,6 @@ def test_deletion(self): self.index_service.delete_indices(self.catalog) self.index_service.create_indices(self.catalog) - def _parse_index_name(self, hit) -> tuple[str, DocumentType]: - index_name = IndexName.parse(hit['_index']) - index_name.validate() - return index_name.entity_type, index_name.doc_type - def _filter_hits(self, hits: JSONs, doc_type: Optional[DocumentType] = None, diff --git a/test/integration_test.py b/test/integration_test.py index dd33da8312..03c71dae2e 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -1597,11 +1597,14 @@ def _test_catalog(self, catalog: config.Catalog): self.assertIsInstance(entities, dict) self.assertIsInstance(links, list) entities = set(map(EntityReference.parse, entities.keys())) - linked_entities = frozenset.union(*( - Link.from_json(link).all_entities - for link in links - )) - self.assertEqual(entities, linked_entities) + if len(entities) > 1: + linked_entities = frozenset().union(*( + Link.from_json(link).all_entities + for link in links + )) + self.assertEqual(entities, linked_entities) + else: + self.assertEqual([], links) else: assert False, metadata_plugin_name