diff --git a/deployments/anvilbox/environment.py b/deployments/anvilbox/environment.py index ee6555aea..e5e3c15a4 100644 --- a/deployments/anvilbox/environment.py +++ b/deployments/anvilbox/environment.py @@ -141,6 +141,7 @@ def env() -> Mapping[str, Optional[str]]: 'AZUL_TDR_SOURCE_LOCATION': 'us-central1', 'AZUL_TDR_SERVICE_URL': 'https://jade.datarepo-dev.broadinstitute.org', 'AZUL_SAM_SERVICE_URL': 'https://sam.dsde-dev.broadinstitute.org', + 'AZUL_DUOS_SERVICE_URL': 'https://consent.dsde-dev.broadinstitute.org', **( { diff --git a/deployments/anvildev/environment.py b/deployments/anvildev/environment.py index fa599604a..4a66afcbb 100644 --- a/deployments/anvildev/environment.py +++ b/deployments/anvildev/environment.py @@ -115,6 +115,7 @@ def env() -> Mapping[str, Optional[str]]: 'AZUL_TDR_SOURCE_LOCATION': 'us-central1', 'AZUL_TDR_SERVICE_URL': 'https://jade.datarepo-dev.broadinstitute.org', 'AZUL_SAM_SERVICE_URL': 'https://sam.dsde-dev.broadinstitute.org', + 'AZUL_DUOS_SERVICE_URL': 'https://consent.dsde-dev.broadinstitute.org', 'AZUL_ENABLE_MONITORING': '1', diff --git a/lambdas/service/app.py b/lambdas/service/app.py index e1f2c136f..354c906f9 100644 --- a/lambdas/service/app.py +++ b/lambdas/service/app.py @@ -227,7 +227,7 @@ # changes and reset the minor version to zero. Otherwise, increment only # the minor version for backwards compatible changes. A backwards # compatible change is one that does not require updates to clients. - 'version': '2.0' + 'version': '2.1' }, 'tags': [ { diff --git a/src/azul/__init__.py b/src/azul/__init__.py index 422ae9560..fcff4791e 100644 --- a/src/azul/__init__.py +++ b/src/azul/__init__.py @@ -312,6 +312,10 @@ def tdr_service_url(self) -> mutable_furl: def sam_service_url(self) -> mutable_furl: return furl(self.environ['AZUL_SAM_SERVICE_URL']) + @property + def duos_service_url(self) -> mutable_furl: + return furl(self.environ['AZUL_DUOS_SERVICE_URL']) + @property def dss_query_prefix(self) -> str: return self.environ.get('AZUL_DSS_QUERY_PREFIX', '') diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index b9be57f9d..a3ecf70fe 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -44,6 +44,9 @@ config, freeze, ) +from azul.collections import ( + deep_dict_merge, +) from azul.deployment import ( aws, ) @@ -717,15 +720,19 @@ def _select_latest(self, cur_bundle_uuid, cur_bundle_version, cur_entity = \ collated_entities.get(entity_id, (None, '', None)) if cur_entity is not None and entity.keys() != cur_entity.keys(): - symmetric_difference = set(entity.keys()).symmetric_difference(cur_entity) - log.warning('Document shape of `%s` entity `%s` does not match between bundles ' - '%s, version %s and %s, version %s: %s', - entity_type, entity_id, - cur_bundle_uuid, cur_bundle_version, - contribution.coordinates.bundle.uuid, - contribution.coordinates.bundle.version, - symmetric_difference) - if cur_bundle_version < contribution.coordinates.bundle.version: + if cur_bundle_version == contribution.coordinates.bundle.version: + assert contribution.entity.entity_type == 'datasets', contribution + entity = deep_dict_merge((entity, cur_entity)) + else: + symmetric_difference = set(entity.keys()).symmetric_difference(cur_entity) + log.warning('Document shape of `%s` entity `%s` does not match between bundles ' + '%s, version %s and %s, version %s: %s', + entity_type, entity_id, + cur_bundle_uuid, cur_bundle_version, + contribution.coordinates.bundle.uuid, + contribution.coordinates.bundle.version, + symmetric_difference) + if cur_bundle_version <= contribution.coordinates.bundle.version: collated_entities[entity_id] = ( contribution.coordinates.bundle.uuid, contribution.coordinates.bundle.version, diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py index 42ae2e225..2bbda3f74 100644 --- a/src/azul/plugins/metadata/anvil/indexer/transform.py +++ b/src/azul/plugins/metadata/anvil/indexer/transform.py @@ -125,7 +125,7 @@ def field_types(cls) -> FieldTypes: return { 'activities': cls._activity_types(), 'biosamples': cls._biosample_types(), - 'datasets': cls._dataset_types(), + 'datasets': {**cls._dataset_types(), **cls._sparse_dataset_types()}, 'diagnoses': cls._diagnosis_types(), 'donors': cls._donor_types(), 'files': cls._aggregate_file_types(), @@ -221,6 +221,13 @@ def _biosample_types(cls) -> FieldTypes: 'donor_age_at_collection': pass_thru_json, } + @classmethod + def _sparse_dataset_types(cls) -> FieldTypes: + return { + 'document_id': null_str, + 'description': null_str, + } + @classmethod def _dataset_types(cls) -> FieldTypes: return { @@ -370,6 +377,9 @@ def _biosample(self, biosample: EntityReference) -> MutableJSON: def _dataset(self, dataset: EntityReference) -> MutableJSON: return self._entity(dataset, self._dataset_types()) + def _sparse_dataset(self, dataset: EntityReference) -> MutableJSON: + return self._entity(dataset, self._sparse_dataset_types()) + def _diagnosis(self, diagnosis: EntityReference) -> MutableJSON: return self._entity(diagnosis, self._diagnosis_types(), @@ -446,17 +456,22 @@ def entity_type(cls) -> str: return 'datasets' def _transform(self, entity: EntityReference) -> Contribution: - contents = dict( - activities=self._entities(self._activity, chain.from_iterable( - self._entities_by_type[activity_type] - for activity_type in self._activity_polymorphic_types - )), - biosamples=self._entities(self._biosample, self._entities_by_type['biosample']), - datasets=[self._dataset(entity)], - diagnoses=self._entities(self._diagnosis, self._entities_by_type['diagnosis']), - donors=self._entities(self._donor, self._entities_by_type['donor']), - files=self._entities(self._file, self._entities_by_type['file']), - ) + try: + dataset = self._dataset(entity) + except KeyError: + contents = dict(datasets=[self._sparse_dataset(entity)]) + else: + contents = dict( + activities=self._entities(self._activity, chain.from_iterable( + self._entities_by_type[activity_type] + for activity_type in self._activity_polymorphic_types + )), + biosamples=self._entities(self._biosample, self._entities_by_type['biosample']), + datasets=[dataset], + diagnoses=self._entities(self._diagnosis, self._entities_by_type['diagnosis']), + donors=self._entities(self._donor, self._entities_by_type['donor']), + files=self._entities(self._file, self._entities_by_type['file']), + ) return self._contribution(contents, entity) diff --git a/src/azul/plugins/repository/tdr_anvil/__init__.py b/src/azul/plugins/repository/tdr_anvil/__init__.py index 2ad29f0df..c98e72ca6 100644 --- a/src/azul/plugins/repository/tdr_anvil/__init__.py +++ b/src/azul/plugins/repository/tdr_anvil/__init__.py @@ -60,6 +60,7 @@ MutableJSONs, ) from azul.uuids import ( + change_version, validate_uuid_prefix, ) @@ -98,9 +99,21 @@ class BundleEntityType(Enum): specifically tailored traversal implementation. Supplementary bundles always consist of exactly two entities: one file (the bundle entity) and one dataset. + + The `dataset.description` field is unusual in that it is not stored in + BigQuery and must be retrieved via Terra's DUOS API. There is only one + dataset per snapshot, which is referenced in all primary and supplementary + bundles. Therefore, only one request to DUOS per *snapshot* is necessary, + but if `description` is retrieved at the same time as the other dataset + fields, we will make one request per *bundle* instead, potentially + overloading the DUOS service. Our solution is to retrieve `description` only + in a dedicated bundle format, once per snapshot, and merge it with the other + dataset fields during aggregation. This bundle contains only a single + dataset entity with only the `description` field populated. """ primary: EntityType = 'biosample' supplementary: EntityType = 'file' + dataset: EntityType = 'dataset' class AnvilBundleFQIDJSON(SourcedBundleFQIDJSON): @@ -178,7 +191,8 @@ def _list_bundles(self, validate_uuid_prefix(partition_prefix) primary = BundleEntityType.primary.value supplementary = BundleEntityType.supplementary.value - rows = self._run_sql(f''' + dataset = BundleEntityType.dataset.value + rows = list(self._run_sql(f''' SELECT datarepo_row_id, {primary!r} AS entity_type FROM {backtick(self._full_table_name(spec, primary))} WHERE STARTS_WITH(datarepo_row_id, '{partition_prefix}') @@ -186,18 +200,34 @@ def _list_bundles(self, SELECT datarepo_row_id, {supplementary!r} AS entity_type FROM {backtick(self._full_table_name(spec, supplementary))} AS supp WHERE supp.is_supplementary AND STARTS_WITH(datarepo_row_id, '{partition_prefix}') - ''') - return [ - AnvilBundleFQID(source=source, - # Reversibly tweak the entity UUID to prevent - # collisions between entity IDs and bundle IDs - uuid=uuids.change_version(row['datarepo_row_id'], - self.datarepo_row_uuid_version, - self.bundle_uuid_version), - version=self._version, - entity_type=BundleEntityType(row['entity_type'])) - for row in rows - ] + UNION ALL + SELECT datarepo_row_id, {dataset!r} AS entity_type + FROM {backtick(self._full_table_name(spec, dataset))} + ''')) + bundles = [] + dataset_count = 0 + for row in rows: + # We intentionally omit the WHERE clause for datasets so that we can + # verify our assumption that each snapshot only contains rows for a + # single dataset. This verification is performed independently and + # concurrently for every partition, but only one partition actually + # emits the bundle. + if row['entity_type'] == dataset: + require(0 == dataset_count) + dataset_count += 1 + if not row['datarepo_row_id'].startswith(partition_prefix): + continue + bundles.append(AnvilBundleFQID( + source=source, + # Reversibly tweak the entity UUID to prevent + # collisions between entity IDs and bundle IDs + uuid=uuids.change_version(row['datarepo_row_id'], + self.datarepo_row_uuid_version, + self.bundle_uuid_version), + version=self._version, + entity_type=BundleEntityType(row['entity_type']) + )) + return bundles def list_partitions(self, source: TDRSourceRef @@ -250,6 +280,9 @@ def _emulate_bundle(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle: elif bundle_fqid.entity_type is BundleEntityType.supplementary: log.info('Bundle %r is a supplementary bundle', bundle_fqid.uuid) return self._supplementary_bundle(bundle_fqid) + elif bundle_fqid.entity_type is BundleEntityType.dataset: + log.info('Bundle %r is a dataset description', bundle_fqid.uuid) + return self._dataset_description(bundle_fqid) else: assert False, bundle_fqid.entity_type @@ -334,6 +367,19 @@ def _supplementary_bundle(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle: result.add_links({Link(**link_args)}, entities_by_key) return result + def _dataset_description(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle: + description = self.tdr.get_duos(bundle_fqid.source)['studyDescription'] + entity_id = change_version(bundle_fqid.uuid, + self.bundle_uuid_version, + self.datarepo_row_uuid_version) + entity = EntityReference(entity_type=bundle_fqid.entity_type.value, + entity_id=entity_id) + bundle = TDRAnvilBundle(fqid=bundle_fqid) + bundle.add_entity(entity=entity, + version=self._version, + row={'description': description}) + return bundle + def _bundle_entity(self, bundle_fqid: AnvilBundleFQID) -> KeyReference: source = bundle_fqid.source bundle_uuid = bundle_fqid.uuid diff --git a/src/azul/terra.py b/src/azul/terra.py index 578f9025a..abefa102a 100644 --- a/src/azul/terra.py +++ b/src/azul/terra.py @@ -555,6 +555,9 @@ def _job_info(self, job: QueryJob) -> JSON: def _repository_endpoint(self, *path: str) -> mutable_furl: return config.tdr_service_url.set(path=('api', 'repository', 'v1', *path)) + def _duos_endpoint(self, *path: str) -> mutable_furl: + return config.duos_service_url.set(path=('api', *path)) + def _check_response(self, endpoint: furl, response: urllib3.HTTPResponse @@ -659,3 +662,10 @@ def for_registered_user(cls, authentication: OAuth2) -> 'TDRClient': def drs_client(self) -> DRSClient: return DRSClient(http_client=self._http_client) + + def get_duos(self, source: SourceRef) -> MutableJSON: + response = self._retrieve_source(source) + duos_id = response['duosFirecloudGroup']['duosId'] + url = self._duos_endpoint('dataset', 'registration', duos_id) + response = self._request('GET', url) + return self._check_response(url, response) diff --git a/test/indexer/__init__.py b/test/indexer/__init__.py index c73f3bd10..ceb40fcb0 100644 --- a/test/indexer/__init__.py +++ b/test/indexer/__init__.py @@ -15,6 +15,9 @@ from elasticsearch.helpers import ( scan, ) +from more_itertools import ( + one, +) from azul import ( CatalogName, @@ -26,6 +29,7 @@ SourcedBundleFQID, ) from azul.indexer.document import ( + DocumentType, IndexName, ) from azul.indexer.index_service import ( @@ -133,9 +137,22 @@ def _get_all_hits(self): index=','.join(self.index_service.index_names(self.catalog)), preserve_order=True)) for hit in hits: + entity_type, doc_type = self._parse_index_name(hit) + if ( + entity_type == 'datasets' + and doc_type is DocumentType.contribution + and 'description' in one(hit['_source']['contents']['datasets']) + ): + # Sparse dataset contributions contain no lists + continue self._verify_sorted_lists(hit['_source']) return hits + def _parse_index_name(self, hit) -> tuple[str, DocumentType]: + index_name = IndexName.parse(hit['_index']) + index_name.validate() + return index_name.entity_type, index_name.doc_type + def _load_canned_result(self, bundle_fqid: BundleFQID) -> MutableJSONs: """ Load the canned index documents for the given canned bundle and fix the diff --git a/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json b/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json new file mode 100644 index 000000000..4a9831df0 --- /dev/null +++ b/test/indexer/data/2370f948-2783-aeb6-afea-e022897f4dcf.tdr.anvil.json @@ -0,0 +1,8 @@ +{ + "entities": { + "dataset/2370f948-2783-4eb6-afea-e022897f4dcf": { + "description": "Study description from DUOS" + } + }, + "links": [] +} \ No newline at end of file diff --git a/test/indexer/test_anvil.py b/test/indexer/test_anvil.py index 762714936..141434d7f 100644 --- a/test/indexer/test_anvil.py +++ b/test/indexer/test_anvil.py @@ -1,6 +1,12 @@ +from collections import ( + defaultdict, +) from operator import ( itemgetter, ) +from typing import ( + cast, +) import unittest from more_itertools import ( @@ -11,13 +17,16 @@ CatalogName, config, ) -from azul.indexer import ( - SourcedBundleFQID, +from azul.indexer.document import ( + DocumentType, + EntityReference, ) from azul.logging import ( configure_test_logging, ) from azul.plugins.repository.tdr_anvil import ( + AnvilBundleFQID, + BundleEntityType, TDRAnvilBundle, ) from indexer import ( @@ -36,14 +45,26 @@ def setUpModule(): class AnvilIndexerTestCase(IndexerTestCase, TDRAnvilPluginTestCase): @classmethod - def bundles(cls) -> list[SourcedBundleFQID]: + def bundle_fqid(cls, + *, + uuid, + version, + entity_type=BundleEntityType.primary + ) -> AnvilBundleFQID: + return AnvilBundleFQID(source=cls.source, + uuid=uuid, + version=version, + entity_type=entity_type) + + @classmethod + def bundles(cls) -> list[AnvilBundleFQID]: return [ cls.bundle_fqid(uuid='826dea02-e274-affe-aabc-eb3db63ad068', version='foo') ] @property - def bundle(self) -> SourcedBundleFQID: + def bundle(self) -> AnvilBundleFQID: return one(self.bundles()) @classmethod @@ -76,6 +97,39 @@ def test_indexing(self): expected_hits = self._load_canned_result(self.bundle) self.assertEqual(expected_hits, hits) + def test_dataset_description(self): + dataset_ref = EntityReference(entity_type='dataset', + entity_id='2370f948-2783-4eb6-afea-e022897f4dcf') + dataset_bundle = self.bundle_fqid(uuid='2370f948-2783-aeb6-afea-e022897f4dcf', + version=self.bundle.version, + entity_type=BundleEntityType.dataset) + + bundles = [self.bundle, dataset_bundle] + for bundle_fqid in bundles: + bundle = cast(TDRAnvilBundle, self._load_canned_bundle(bundle_fqid)) + bundle.links.clear() + bundle.entities = {dataset_ref: bundle.entities[dataset_ref]} + self._index_bundle(bundle, delete=False) + + hits = self._get_all_hits() + doc_counts = defaultdict(int) + for hit in hits: + entity_type, doc_type = self._parse_index_name(hit) + doc_counts[doc_type] += 1 + self.assertEqual('datasets', entity_type) + if doc_type is DocumentType.aggregate: + self.assertEqual(2, hit['_source']['num_contributions']) + self.assertEqual(sorted(b.uuid for b in bundles), + sorted(b['uuid'] for b in hit['_source']['bundles'])) + contents = one(hit['_source']['contents']['datasets']) + # These fields are populated only in the primary bundle + self.assertEqual(dataset_ref.entity_id, contents['document_id']) + self.assertEqual(['phs000693'], contents['registered_identifier']) + # This field is populated only in the sparse dataset bundle + self.assertEqual('Study description from DUOS', contents['description']) + self.assertEqual(1, doc_counts[DocumentType.aggregate]) + self.assertEqual(2, doc_counts[DocumentType.contribution]) + # FIXME: Enable test after the issue with TinyQuery `WITH` has been resolved # https://github.com/DataBiosphere/azul/issues/5046 @unittest.skip('TinyQuery does not support the WITH clause') diff --git a/test/indexer/test_indexer.py b/test/indexer/test_indexer.py index 80b9330fd..c0e53ed4d 100644 --- a/test/indexer/test_indexer.py +++ b/test/indexer/test_indexer.py @@ -212,11 +212,6 @@ def test_deletion(self): self.index_service.delete_indices(self.catalog) self.index_service.create_indices(self.catalog) - def _parse_index_name(self, hit) -> tuple[str, DocumentType]: - index_name = IndexName.parse(hit['_index']) - index_name.validate() - return index_name.entity_type, index_name.doc_type - def _filter_hits(self, hits: JSONs, doc_type: Optional[DocumentType] = None, diff --git a/test/integration_test.py b/test/integration_test.py index b28e5849c..c5ce12315 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -1581,11 +1581,14 @@ def _test_catalog(self, catalog: config.Catalog): self.assertIsInstance(entities, dict) self.assertIsInstance(links, list) entities = set(map(EntityReference.parse, entities.keys())) - linked_entities = frozenset.union(*( - Link.from_json(link).all_entities - for link in links - )) - self.assertEqual(entities, linked_entities) + if len(entities) > 1: + linked_entities = frozenset().union(*( + Link.from_json(link).all_entities + for link in links + )) + self.assertEqual(entities, linked_entities) + else: + self.assertEqual([], links) else: assert False, metadata_plugin_name