diff --git a/deployments/anvilbox/environment.py b/deployments/anvilbox/environment.py index ee6555aea..e5e3c15a4 100644 --- a/deployments/anvilbox/environment.py +++ b/deployments/anvilbox/environment.py @@ -141,6 +141,7 @@ def env() -> Mapping[str, Optional[str]]: 'AZUL_TDR_SOURCE_LOCATION': 'us-central1', 'AZUL_TDR_SERVICE_URL': 'https://jade.datarepo-dev.broadinstitute.org', 'AZUL_SAM_SERVICE_URL': 'https://sam.dsde-dev.broadinstitute.org', + 'AZUL_DUOS_SERVICE_URL': 'https://consent.dsde-dev.broadinstitute.org', **( { diff --git a/deployments/anvildev/environment.py b/deployments/anvildev/environment.py index fa599604a..4a66afcbb 100644 --- a/deployments/anvildev/environment.py +++ b/deployments/anvildev/environment.py @@ -115,6 +115,7 @@ def env() -> Mapping[str, Optional[str]]: 'AZUL_TDR_SOURCE_LOCATION': 'us-central1', 'AZUL_TDR_SERVICE_URL': 'https://jade.datarepo-dev.broadinstitute.org', 'AZUL_SAM_SERVICE_URL': 'https://sam.dsde-dev.broadinstitute.org', + 'AZUL_DUOS_SERVICE_URL': 'https://consent.dsde-dev.broadinstitute.org', 'AZUL_ENABLE_MONITORING': '1', diff --git a/environment.py b/environment.py index 1bbb5dfd0..6185fcf08 100644 --- a/environment.py +++ b/environment.py @@ -618,6 +618,11 @@ def env() -> Mapping[str, Optional[str]]: # 'AZUL_SAM_SERVICE_URL': None, + # The URL of Terra's DUOS service from which to index descriptions of + # AnVIL datasets. If left unset, this step is skipped during indexing. + # + 'AZUL_DUOS_SERVICE_URL': None, + # OAuth2 Client ID to be used for authenticating users. See section # 3.2 of the README # diff --git a/lambdas/service/app.py b/lambdas/service/app.py index e1f2c136f..354c906f9 100644 --- a/lambdas/service/app.py +++ b/lambdas/service/app.py @@ -227,7 +227,7 @@ # changes and reset the minor version to zero. Otherwise, increment only # the minor version for backwards compatible changes. A backwards # compatible change is one that does not require updates to clients. - 'version': '2.0' + 'version': '2.1' }, 'tags': [ { diff --git a/lambdas/service/openapi.json b/lambdas/service/openapi.json index 6dc25db49..6148675ce 100644 --- a/lambdas/service/openapi.json +++ b/lambdas/service/openapi.json @@ -3,7 +3,7 @@ "info": { "title": "azul_service", "description": "\n# Overview\n\nAzul is a REST web service for querying metadata associated with\nboth experimental and analysis data from a data repository. In order\nto deliver response times that make it suitable for interactive use\ncases, the set of metadata properties that it exposes for sorting,\nfiltering, and aggregation is limited. Azul provides a uniform view\nof the metadata over a range of diverse schemas, effectively\nshielding clients from changes in the schemas as they occur over\ntime. It does so, however, at the expense of detail in the set of\nmetadata properties it exposes and in the accuracy with which it\naggregates them.\n\nAzul denormalizes and aggregates metadata into several different\nindices for selected entity types. Metadata entities can be queried\nusing the [Index](#operations-tag-Index) endpoints.\n\nA set of indices forms a catalog. There is a default catalog called\n`dcp2` which will be used unless a\ndifferent catalog name is specified using the `catalog` query\nparameter. Metadata from different catalogs is completely\nindependent: a response obtained by querying one catalog does not\nnecessarily correlate to a response obtained by querying another\none. Two catalogs can contain metadata from the same sources or\ndifferent sources. It is only guaranteed that the body of a\nresponse by any given endpoint adheres to one schema,\nindependently of which catalog was specified in the request.\n\nAzul provides the ability to download data and metadata via the\n[Manifests](#operations-tag-Manifests) endpoints. The\n`curl` format manifests can be used to\ndownload data files. Other formats provide various views of the\nmetadata. Manifests can be generated for a selection of files using\nfilters. These filters are interchangeable with the filters used by\nthe [Index](#operations-tag-Index) endpoints.\n\nAzul also provides a [summary](#operations-Index-get_index_summary)\nview of indexed data.\n\n## Data model\n\nAny index, when queried, returns a JSON array of hits. Each hit\nrepresents a metadata entity. Nested in each hit is a summary of the\nproperties of entities associated with the hit. An entity is\nassociated either by a direct edge in the original metadata graph,\nor indirectly as a series of edges. The nested properties are\ngrouped by the type of the associated entity. The properties of all\ndata files associated with a particular sample, for example, are\nlisted under `hits[*].files` in a `/index/samples` response. It is\nimportant to note that while each _hit_ represents a discrete\nentity, the properties nested within that hit are the result of an\naggregation over potentially many associated entities.\n\nTo illustrate this, consider a data file that is part of two\nprojects (a project is a group of related experiments, typically by\none laboratory, institution or consortium). Querying the `files`\nindex for this file yields a hit looking something like:\n\n```\n{\n \"projects\": [\n {\n \"projectTitle\": \"Project One\"\n \"laboratory\": ...,\n ...\n },\n {\n \"projectTitle\": \"Project Two\"\n \"laboratory\": ...,\n ...\n }\n ],\n \"files\": [\n {\n \"format\": \"pdf\",\n \"name\": \"Team description.pdf\",\n ...\n }\n ]\n}\n```\n\nThis example hit contains two kinds of nested entities (a hit in an\nactual Azul response will contain more): There are the two projects\nentities, and the file itself. These nested entities contain\nselected metadata properties extracted in a consistent way. This\nmakes filtering and sorting simple.\n\nAlso notice that there is only one file. When querying a particular\nindex, the corresponding entity will always be a singleton like\nthis.\n", - "version": "2.0" + "version": "2.1" }, "tags": [ { diff --git a/src/azul/__init__.py b/src/azul/__init__.py index d91abc9dd..85d46f9de 100644 --- a/src/azul/__init__.py +++ b/src/azul/__init__.py @@ -322,6 +322,11 @@ def tdr_service_url(self) -> mutable_furl: def sam_service_url(self) -> mutable_furl: return furl(self.environ['AZUL_SAM_SERVICE_URL']) + @property + def duos_service_url(self) -> Optional[mutable_furl]: + url = self.environ.get('AZUL_DUOS_SERVICE_URL') + return None if url is None else furl(url) + @property def dss_query_prefix(self) -> str: return self.environ.get('AZUL_DSS_QUERY_PREFIX', '') diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py index b21ab34f9..66881a761 100644 --- a/src/azul/plugins/metadata/anvil/indexer/transform.py +++ b/src/azul/plugins/metadata/anvil/indexer/transform.py @@ -35,6 +35,9 @@ from azul import ( JSON, ) +from azul.collections import ( + deep_dict_merge, +) from azul.indexer import ( BundleFQID, BundlePartition, @@ -133,7 +136,7 @@ def field_types(cls) -> FieldTypes: return { 'activities': cls._activity_types(), 'biosamples': cls._biosample_types(), - 'datasets': cls._dataset_types(), + 'datasets': {**cls._dataset_types(), **cls._duos_types()}, 'diagnoses': cls._diagnosis_types(), 'donors': cls._donor_types(), 'files': cls._aggregate_file_types(), @@ -229,6 +232,13 @@ def _biosample_types(cls) -> FieldTypes: 'donor_age_at_collection': pass_thru_json, } + @classmethod + def _duos_types(cls) -> FieldTypes: + return { + 'document_id': null_str, + 'description': null_str, + } + @classmethod def _dataset_types(cls) -> FieldTypes: return { @@ -378,6 +388,9 @@ def _biosample(self, biosample: EntityReference) -> MutableJSON: def _dataset(self, dataset: EntityReference) -> MutableJSON: return self._entity(dataset, self._dataset_types()) + def _duos(self, dataset: EntityReference) -> MutableJSON: + return self._entity(dataset, self._duos_types()) + def _diagnosis(self, diagnosis: EntityReference) -> MutableJSON: return self._entity(diagnosis, self._diagnosis_types(), @@ -418,7 +431,29 @@ def reconcile_inner_entities(cls, ) -> tuple[JSON, BundleFQID]: this_entity, this_bundle = this that_entity, that_bundle = that - return that if that_bundle.version > this_bundle.version else this + if this_entity.keys() == that_entity.keys(): + return that if that_bundle.version > this_bundle.version else this + else: + assert entity_type == 'datasets', (entity_type, this, that) + expected_keys = cls.field_types()[entity_type].keys() + # There will be one contribution for a DUOS stub, and many redundant + # contributions (one per non-duos bundle) for the dataset metadata + # from BigQuery. Once the stub has been merged with a single main + # contribution to consolidate all expected fields, we can disregard + # the other contributions as usual. + if this_entity.keys() == expected_keys: + return this + elif that_entity.keys() == expected_keys: + return that + else: + assert this_bundle.version == that_bundle.version, (this, that) + assert this_entity.keys() < expected_keys, this + assert that_entity.keys() < expected_keys, that + merged = deep_dict_merge((this_entity, that_entity)) + assert merged.keys() == expected_keys, (this, that) + # We can safely discard that_bundle because only the version is + # used by the caller, and we know the versions are equal. + return merged, this_bundle class ActivityTransformer(BaseTransformer): @@ -469,17 +504,22 @@ def entity_type(cls) -> str: return 'datasets' def _transform(self, entity: EntityReference) -> Contribution: - contents = dict( - activities=self._entities(self._activity, chain.from_iterable( - self._entities_by_type[activity_type] - for activity_type in self._activity_polymorphic_types - )), - biosamples=self._entities(self._biosample, self._entities_by_type['biosample']), - datasets=[self._dataset(entity)], - diagnoses=self._entities(self._diagnosis, self._entities_by_type['diagnosis']), - donors=self._entities(self._donor, self._entities_by_type['donor']), - files=self._entities(self._file, self._entities_by_type['file']), - ) + try: + dataset = self._dataset(entity) + except KeyError: + contents = dict(datasets=[self._duos(entity)]) + else: + contents = dict( + activities=self._entities(self._activity, chain.from_iterable( + self._entities_by_type[activity_type] + for activity_type in self._activity_polymorphic_types + )), + biosamples=self._entities(self._biosample, self._entities_by_type['biosample']), + datasets=[dataset], + diagnoses=self._entities(self._diagnosis, self._entities_by_type['diagnosis']), + donors=self._entities(self._donor, self._entities_by_type['donor']), + files=self._entities(self._file, self._entities_by_type['file']), + ) return self._contribution(contents, entity) diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index 69c745c92..106b177d7 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -60,6 +60,7 @@ MutableJSONs, ) from azul.uuids import ( + change_version, validate_uuid_prefix, ) @@ -98,9 +99,21 @@ class BundleEntityType(Enum): specifically tailored traversal implementation. Supplementary bundles always consist of exactly two entities: one file (the bundle entity) and one dataset. + + The `dataset.description` field is unusual in that it is not stored in + BigQuery and must be retrieved via Terra's DUOS API. There is only one + dataset per snapshot, which is referenced in all primary and supplementary + bundles. Therefore, only one request to DUOS per *snapshot* is necessary, + but if `description` is retrieved at the same time as the other dataset + fields, we will make one request per *bundle* instead, potentially + overloading the DUOS service. Our solution is to retrieve `description` only + in a dedicated bundle format, once per snapshot, and merge it with the other + dataset fields during aggregation. This bundle contains only a single + dataset entity with only the `description` field populated. """ primary: EntityType = 'biosample' supplementary: EntityType = 'file' + duos: EntityType = 'dataset' class TDRAnvilBundleFQIDJSON(SourcedBundleFQIDJSON): @@ -178,7 +191,8 @@ def _list_bundles(self, validate_uuid_prefix(partition_prefix) primary = BundleEntityType.primary.value supplementary = BundleEntityType.supplementary.value - rows = self._run_sql(f''' + dataset = BundleEntityType.duos.value + rows = list(self._run_sql(f''' SELECT datarepo_row_id, {primary!r} AS entity_type FROM {backtick(self._full_table_name(spec, primary))} WHERE STARTS_WITH(datarepo_row_id, '{partition_prefix}') @@ -186,18 +200,37 @@ def _list_bundles(self, SELECT datarepo_row_id, {supplementary!r} AS entity_type FROM {backtick(self._full_table_name(spec, supplementary))} AS supp WHERE supp.is_supplementary AND STARTS_WITH(datarepo_row_id, '{partition_prefix}') - ''') - return [ - TDRAnvilBundleFQID(source=source, - # Reversibly tweak the entity UUID to prevent - # collisions between entity IDs and bundle IDs - uuid=uuids.change_version(row['datarepo_row_id'], - self.datarepo_row_uuid_version, - self.bundle_uuid_version), - version=self._version, - entity_type=BundleEntityType(row['entity_type'])) - for row in rows - ] + ''' + ( + '' if config.duos_service_url is None else f''' + UNION ALL + SELECT datarepo_row_id, {dataset!r} AS entity_type + FROM {backtick(self._full_table_name(spec, dataset))} + ''' + ))) + bundles = [] + dataset_count = 0 + for row in rows: + # We intentionally omit the WHERE clause for datasets so that we can + # verify our assumption that each snapshot only contains rows for a + # single dataset. This verification is performed independently and + # concurrently for every partition, but only one partition actually + # emits the bundle. + if row['entity_type'] == dataset: + require(0 == dataset_count) + dataset_count += 1 + if not row['datarepo_row_id'].startswith(partition_prefix): + continue + bundles.append(TDRAnvilBundleFQID( + source=source, + # Reversibly tweak the entity UUID to prevent + # collisions between entity IDs and bundle IDs + uuid=uuids.change_version(row['datarepo_row_id'], + self.datarepo_row_uuid_version, + self.bundle_uuid_version), + version=self._version, + entity_type=BundleEntityType(row['entity_type']) + )) + return bundles def list_partitions(self, source: TDRSourceRef @@ -250,6 +283,10 @@ def _emulate_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: elif bundle_fqid.entity_type is BundleEntityType.supplementary: log.info('Bundle %r is a supplementary bundle', bundle_fqid.uuid) return self._supplementary_bundle(bundle_fqid) + elif bundle_fqid.entity_type is BundleEntityType.duos: + assert config.duos_service_url is not None, bundle_fqid + log.info('Bundle %r is a dataset description', bundle_fqid.uuid) + return self._dataset_description(bundle_fqid) else: assert False, bundle_fqid.entity_type @@ -334,6 +371,19 @@ def _supplementary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBund result.add_links({Link(**link_args)}, entities_by_key) return result + def _dataset_description(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle: + description = self.tdr.get_duos(bundle_fqid.source)['studyDescription'] + entity_id = change_version(bundle_fqid.uuid, + self.bundle_uuid_version, + self.datarepo_row_uuid_version) + entity = EntityReference(entity_type=bundle_fqid.entity_type.value, + entity_id=entity_id) + bundle = TDRAnvilBundle(fqid=bundle_fqid) + bundle.add_entity(entity=entity, + version=self._version, + row={'description': description}) + return bundle + def _bundle_entity(self, bundle_fqid: TDRAnvilBundleFQID) -> KeyReference: source = bundle_fqid.source bundle_uuid = bundle_fqid.uuid diff --git a/src/azul/terra.py b/src/azul/terra.py index 2883499c4..b33a40472 100644 --- a/src/azul/terra.py +++ b/src/azul/terra.py @@ -554,6 +554,9 @@ def _job_info(self, job: QueryJob) -> JSON: def _repository_endpoint(self, *path: str) -> mutable_furl: return config.tdr_service_url.set(path=('api', 'repository', 'v1', *path)) + def _duos_endpoint(self, *path: str) -> mutable_furl: + return config.duos_service_url.set(path=('api', *path)) + def _check_response(self, endpoint: furl, response: urllib3.HTTPResponse @@ -658,3 +661,10 @@ def for_registered_user(cls, authentication: OAuth2) -> 'TDRClient': def drs_client(self) -> DRSClient: return DRSClient(http_client=self._http_client) + + def get_duos(self, source: SourceRef) -> MutableJSON: + response = self._retrieve_source(source) + duos_id = response['duosFirecloudGroup']['duosId'] + url = self._duos_endpoint('dataset', 'registration', duos_id) + response = self._request('GET', url) + return self._check_response(url, response) diff --git a/test/indexer/__init__.py b/test/indexer/__init__.py index 17c8dad04..b9f2ae61c 100644 --- a/test/indexer/__init__.py +++ b/test/indexer/__init__.py @@ -19,6 +19,9 @@ from elasticsearch.helpers import ( scan, ) +from more_itertools import ( + one, +) from azul import ( CatalogName, @@ -30,6 +33,7 @@ SourcedBundleFQID, ) from azul.indexer.document import ( + DocumentType, IndexName, ) from azul.indexer.index_service import ( @@ -137,9 +141,22 @@ def _get_all_hits(self): index=','.join(self.index_service.index_names(self.catalog)), preserve_order=True)) for hit in hits: + entity_type, doc_type = self._parse_index_name(hit) + if ( + entity_type == 'datasets' + and doc_type is DocumentType.contribution + and 'description' in one(hit['_source']['contents']['datasets']) + ): + # Sparse dataset contributions contain no lists + continue self._verify_sorted_lists(hit['_source']) return hits + def _parse_index_name(self, hit) -> tuple[str, DocumentType]: + index_name = IndexName.parse(hit['_index']) + index_name.validate() + return index_name.entity_type, index_name.doc_type + def _load_canned_result(self, bundle_fqid: BundleFQID) -> MutableJSONs: """ Load the canned index documents for the given canned bundle and fix the diff --git a/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json b/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json new file mode 100644 index 000000000..4a9831df0 --- /dev/null +++ b/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json @@ -0,0 +1,8 @@ +{ + "entities": { + "dataset/2370f948-2783-4eb6-afea-e022897f4dcf": { + "description": "Study description from DUOS" + } + }, + "links": [] +} \ No newline at end of file diff --git a/test/indexer/test_anvil.py b/test/indexer/test_anvil.py index f4cae3ab4..39a0d8064 100644 --- a/test/indexer/test_anvil.py +++ b/test/indexer/test_anvil.py @@ -1,6 +1,12 @@ +from collections import ( + defaultdict, +) from operator import ( itemgetter, ) +from typing import ( + cast, +) import unittest from more_itertools import ( @@ -11,14 +17,17 @@ CatalogName, config, ) -from azul.indexer import ( - SourcedBundleFQID, +from azul.indexer.document import ( + DocumentType, + EntityReference, ) from azul.logging import ( configure_test_logging, ) from azul.plugins.repository.tdr_anvil import ( + BundleEntityType, TDRAnvilBundle, + TDRAnvilBundleFQID, ) from indexer import ( IndexerTestCase, @@ -36,14 +45,26 @@ def setUpModule(): class AnvilIndexerTestCase(IndexerTestCase, TDRAnvilPluginTestCase): @classmethod - def bundles(cls) -> list[SourcedBundleFQID]: + def bundle_fqid(cls, + *, + uuid, + version, + entity_type=BundleEntityType.primary + ) -> TDRAnvilBundleFQID: + return TDRAnvilBundleFQID(source=cls.source, + uuid=uuid, + version=version, + entity_type=entity_type) + + @classmethod + def bundles(cls) -> list[TDRAnvilBundleFQID]: return [ cls.bundle_fqid(uuid='826dea02-e274-affe-aabc-eb3db63ad068', version='') ] @property - def bundle(self) -> SourcedBundleFQID: + def bundle(self) -> TDRAnvilBundleFQID: return one(self.bundles()) @classmethod @@ -76,6 +97,39 @@ def test_indexing(self): expected_hits = self._load_canned_result(self.bundle) self.assertEqual(expected_hits, hits) + def test_dataset_description(self): + dataset_ref = EntityReference(entity_type='dataset', + entity_id='2370f948-2783-4eb6-afea-e022897f4dcf') + dataset_bundle = self.bundle_fqid(uuid='2370f948-2783-aeb6-afea-e022897f4dcf', + version=self.bundle.version, + entity_type=BundleEntityType.duos) + + bundles = [self.bundle, dataset_bundle] + for bundle_fqid in bundles: + bundle = cast(TDRAnvilBundle, self._load_canned_bundle(bundle_fqid)) + bundle.links.clear() + bundle.entities = {dataset_ref: bundle.entities[dataset_ref]} + self._index_bundle(bundle, delete=False) + + hits = self._get_all_hits() + doc_counts: dict[DocumentType, int] = defaultdict(int) + for hit in hits: + entity_type, doc_type = self._parse_index_name(hit) + doc_counts[doc_type] += 1 + self.assertEqual('datasets', entity_type) + if doc_type is DocumentType.aggregate: + self.assertEqual(2, hit['_source']['num_contributions']) + self.assertEqual(sorted(b.uuid for b in bundles), + sorted(b['uuid'] for b in hit['_source']['bundles'])) + contents = one(hit['_source']['contents']['datasets']) + # These fields are populated only in the primary bundle + self.assertEqual(dataset_ref.entity_id, contents['document_id']) + self.assertEqual(['phs000693'], contents['registered_identifier']) + # This field is populated only in the sparse dataset bundle + self.assertEqual('Study description from DUOS', contents['description']) + self.assertEqual(1, doc_counts[DocumentType.aggregate]) + self.assertEqual(2, doc_counts[DocumentType.contribution]) + # FIXME: Enable test after the issue with TinyQuery `WITH` has been resolved # https://github.com/DataBiosphere/azul/issues/5046 @unittest.skip('TinyQuery does not support the WITH clause') diff --git a/test/indexer/test_indexer.py b/test/indexer/test_indexer.py index 7af796542..aec29a6f3 100644 --- a/test/indexer/test_indexer.py +++ b/test/indexer/test_indexer.py @@ -216,11 +216,6 @@ def test_deletion(self): self.index_service.delete_indices(self.catalog) self.index_service.create_indices(self.catalog) - def _parse_index_name(self, hit) -> tuple[str, DocumentType]: - index_name = IndexName.parse(hit['_index']) - index_name.validate() - return index_name.entity_type, index_name.doc_type - def _filter_hits(self, hits: JSONs, doc_type: Optional[DocumentType] = None, diff --git a/test/integration_test.py b/test/integration_test.py index 242efa6dc..771e9703a 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -1597,11 +1597,14 @@ def _test_catalog(self, catalog: config.Catalog): self.assertIsInstance(entities, dict) self.assertIsInstance(links, list) entities = set(map(EntityReference.parse, entities.keys())) - linked_entities = frozenset.union(*( - Link.from_json(link).all_entities - for link in links - )) - self.assertEqual(entities, linked_entities) + if len(entities) > 1: + linked_entities = frozenset().union(*( + Link.from_json(link).all_entities + for link in links + )) + self.assertEqual(entities, linked_entities) + else: + self.assertEqual([], links) else: assert False, metadata_plugin_name