Skip to content

Commit

Permalink
[a r] Index AnVIL dataset description from DUOS (#5547)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Nov 14, 2023
1 parent 51f214d commit 2296f29
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 42 deletions.
1 change: 1 addition & 0 deletions deployments/anvilbox/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def env() -> Mapping[str, Optional[str]]:
'AZUL_TDR_SOURCE_LOCATION': 'us-central1',
'AZUL_TDR_SERVICE_URL': 'https://jade.datarepo-dev.broadinstitute.org',
'AZUL_SAM_SERVICE_URL': 'https://sam.dsde-dev.broadinstitute.org',
'AZUL_DUOS_SERVICE_URL': 'https://consent.dsde-dev.broadinstitute.org',

**(
{
Expand Down
1 change: 1 addition & 0 deletions deployments/anvildev/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def env() -> Mapping[str, Optional[str]]:
'AZUL_TDR_SOURCE_LOCATION': 'us-central1',
'AZUL_TDR_SERVICE_URL': 'https://jade.datarepo-dev.broadinstitute.org',
'AZUL_SAM_SERVICE_URL': 'https://sam.dsde-dev.broadinstitute.org',
'AZUL_DUOS_SERVICE_URL': 'https://consent.dsde-dev.broadinstitute.org',

'AZUL_ENABLE_MONITORING': '1',

Expand Down
5 changes: 5 additions & 0 deletions environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,11 @@ def env() -> Mapping[str, Optional[str]]:
#
'AZUL_SAM_SERVICE_URL': None,

# The URL of Terra's DUOS service from which to index descriptions of
# AnVIL datasets. If left unset, this step is skipped during indexing.
#
'AZUL_DUOS_SERVICE_URL': None,

# OAuth2 Client ID to be used for authenticating users. See section
# 3.2 of the README
#
Expand Down
2 changes: 1 addition & 1 deletion lambdas/service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@
# changes and reset the minor version to zero. Otherwise, increment only
# the minor version for backwards compatible changes. A backwards
# compatible change is one that does not require updates to clients.
'version': '3.0'
'version': '3.1'
},
'tags': [
{
Expand Down
2 changes: 1 addition & 1 deletion lambdas/service/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"info": {
"title": "azul_service",
"description": "\n# Overview\n\nAzul is a REST web service for querying metadata associated with\nboth experimental and analysis data from a data repository. In order\nto deliver response times that make it suitable for interactive use\ncases, the set of metadata properties that it exposes for sorting,\nfiltering, and aggregation is limited. Azul provides a uniform view\nof the metadata over a range of diverse schemas, effectively\nshielding clients from changes in the schemas as they occur over\ntime. It does so, however, at the expense of detail in the set of\nmetadata properties it exposes and in the accuracy with which it\naggregates them.\n\nAzul denormalizes and aggregates metadata into several different\nindices for selected entity types. Metadata entities can be queried\nusing the [Index](#operations-tag-Index) endpoints.\n\nA set of indices forms a catalog. There is a default catalog called\n`dcp2` which will be used unless a\ndifferent catalog name is specified using the `catalog` query\nparameter. Metadata from different catalogs is completely\nindependent: a response obtained by querying one catalog does not\nnecessarily correlate to a response obtained by querying another\none. Two catalogs can contain metadata from the same sources or\ndifferent sources. It is only guaranteed that the body of a\nresponse by any given endpoint adheres to one schema,\nindependently of which catalog was specified in the request.\n\nAzul provides the ability to download data and metadata via the\n[Manifests](#operations-tag-Manifests) endpoints. The\n`curl` format manifests can be used to\ndownload data files. Other formats provide various views of the\nmetadata. Manifests can be generated for a selection of files using\nfilters. These filters are interchangeable with the filters used by\nthe [Index](#operations-tag-Index) endpoints.\n\nAzul also provides a [summary](#operations-Index-get_index_summary)\nview of indexed data.\n\n## Data model\n\nAny index, when queried, returns a JSON array of hits. Each hit\nrepresents a metadata entity. Nested in each hit is a summary of the\nproperties of entities associated with the hit. An entity is\nassociated either by a direct edge in the original metadata graph,\nor indirectly as a series of edges. The nested properties are\ngrouped by the type of the associated entity. The properties of all\ndata files associated with a particular sample, for example, are\nlisted under `hits[*].files` in a `/index/samples` response. It is\nimportant to note that while each _hit_ represents a discrete\nentity, the properties nested within that hit are the result of an\naggregation over potentially many associated entities.\n\nTo illustrate this, consider a data file that is part of two\nprojects (a project is a group of related experiments, typically by\none laboratory, institution or consortium). Querying the `files`\nindex for this file yields a hit looking something like:\n\n```\n{\n \"projects\": [\n {\n \"projectTitle\": \"Project One\"\n \"laboratory\": ...,\n ...\n },\n {\n \"projectTitle\": \"Project Two\"\n \"laboratory\": ...,\n ...\n }\n ],\n \"files\": [\n {\n \"format\": \"pdf\",\n \"name\": \"Team description.pdf\",\n ...\n }\n ]\n}\n```\n\nThis example hit contains two kinds of nested entities (a hit in an\nactual Azul response will contain more): There are the two projects\nentities, and the file itself. These nested entities contain\nselected metadata properties extracted in a consistent way. This\nmakes filtering and sorting simple.\n\nAlso notice that there is only one file. When querying a particular\nindex, the corresponding entity will always be a singleton like\nthis.\n",
"version": "3.0"
"version": "3.1"
},
"tags": [
{
Expand Down
5 changes: 5 additions & 0 deletions src/azul/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ def tdr_service_url(self) -> mutable_furl:
def sam_service_url(self) -> mutable_furl:
return furl(self.environ['AZUL_SAM_SERVICE_URL'])

@property
def duos_service_url(self) -> Optional[mutable_furl]:
url = self.environ.get('AZUL_DUOS_SERVICE_URL')
return None if url is None else furl(url)

@property
def dss_query_prefix(self) -> str:
return self.environ.get('AZUL_DSS_QUERY_PREFIX', '')
Expand Down
74 changes: 61 additions & 13 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
attrgetter,
)
from typing import (
AbstractSet,
Callable,
Collection,
Iterable,
Expand All @@ -34,6 +35,10 @@

from azul import (
JSON,
cache,
)
from azul.collections import (
deep_dict_merge,
)
from azul.indexer import (
BundleFQID,
Expand Down Expand Up @@ -133,7 +138,7 @@ def field_types(cls) -> FieldTypes:
return {
'activities': cls._activity_types(),
'biosamples': cls._biosample_types(),
'datasets': cls._dataset_types(),
'datasets': {**cls._dataset_types(), **cls._duos_types()},
'diagnoses': cls._diagnosis_types(),
'donors': cls._donor_types(),
'files': cls._aggregate_file_types(),
Expand Down Expand Up @@ -229,6 +234,13 @@ def _biosample_types(cls) -> FieldTypes:
'donor_age_at_collection': pass_thru_json,
}

@classmethod
def _duos_types(cls) -> FieldTypes:
return {
'document_id': null_str,
'description': null_str,
}

@classmethod
def _dataset_types(cls) -> FieldTypes:
return {
Expand Down Expand Up @@ -378,6 +390,9 @@ def _biosample(self, biosample: EntityReference) -> MutableJSON:
def _dataset(self, dataset: EntityReference) -> MutableJSON:
return self._entity(dataset, self._dataset_types())

def _duos(self, dataset: EntityReference) -> MutableJSON:
return self._entity(dataset, self._duos_types())

def _diagnosis(self, diagnosis: EntityReference) -> MutableJSON:
return self._entity(diagnosis,
self._diagnosis_types(),
Expand Down Expand Up @@ -418,7 +433,35 @@ def reconcile_inner_entities(cls,
) -> tuple[JSON, BundleFQID]:
this_entity, this_bundle = this
that_entity, that_bundle = that
return that if that_bundle.version > this_bundle.version else this
# All AnVIL bundles use a fixed known version
assert this_bundle.version == that_bundle.version, (this, that)
if this_entity.keys() == that_entity.keys():
return this
else:
assert entity_type == 'datasets', (entity_type, this, that)
expected_keys = cls._complete_dataset_keys()
# There will be one contribution for a DUOS stub, and many redundant
# contributions (one per non-DUOS bundle) for the dataset metadata
# from BigQuery. Once the stub has been merged with a single main
# contribution to consolidate all expected fields, we can disregard
# the other contributions as usual.
if this_entity.keys() == expected_keys:
return this
elif that_entity.keys() == expected_keys:
return that
else:
assert this_entity.keys() < expected_keys, this
assert that_entity.keys() < expected_keys, that
merged = deep_dict_merge((this_entity, that_entity))
assert merged.keys() == expected_keys, (this, that)
# We can safely discard that_bundle because only the version is
# used by the caller, and we know the versions are equal.
return merged, this_bundle

@classmethod
@cache
def _complete_dataset_keys(cls) -> AbstractSet[str]:
return cls.field_types()['datasets'].keys()


class ActivityTransformer(BaseTransformer):
Expand Down Expand Up @@ -469,17 +512,22 @@ def entity_type(cls) -> str:
return 'datasets'

def _transform(self, entity: EntityReference) -> Contribution:
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
self._entities_by_type[activity_type]
for activity_type in self._activity_polymorphic_types
)),
biosamples=self._entities(self._biosample, self._entities_by_type['biosample']),
datasets=[self._dataset(entity)],
diagnoses=self._entities(self._diagnosis, self._entities_by_type['diagnosis']),
donors=self._entities(self._donor, self._entities_by_type['donor']),
files=self._entities(self._file, self._entities_by_type['file']),
)
try:
dataset = self._dataset(entity)
except KeyError:
contents = dict(datasets=[self._duos(entity)])
else:
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
self._entities_by_type[activity_type]
for activity_type in self._activity_polymorphic_types
)),
biosamples=self._entities(self._biosample, self._entities_by_type['biosample']),
datasets=[dataset],
diagnoses=self._entities(self._diagnosis, self._entities_by_type['diagnosis']),
donors=self._entities(self._donor, self._entities_by_type['donor']),
files=self._entities(self._file, self._entities_by_type['file']),
)
return self._contribution(contents, entity)


Expand Down
78 changes: 65 additions & 13 deletions src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
MutableJSONs,
)
from azul.uuids import (
change_version,
validate_uuid_prefix,
)

Expand Down Expand Up @@ -98,9 +99,21 @@ class BundleEntityType(Enum):
specifically tailored traversal implementation. Supplementary bundles always
consist of exactly two entities: one file (the bundle entity) and one
dataset.
The `dataset.description` field is unusual in that it is not stored in
BigQuery and must be retrieved via Terra's DUOS API. There is only one
dataset per snapshot, which is referenced in all primary and supplementary
bundles. Therefore, only one request to DUOS per *snapshot* is necessary,
but if `description` is retrieved at the same time as the other dataset
fields, we will make one request per *bundle* instead, potentially
overloading the DUOS service. Our solution is to retrieve `description` only
in a dedicated bundle format, once per snapshot, and merge it with the other
dataset fields during aggregation. This bundle contains only a single
dataset entity with only the `description` field populated.
"""
primary: EntityType = 'biosample'
supplementary: EntityType = 'file'
duos: EntityType = 'dataset'


class TDRAnvilBundleFQIDJSON(SourcedBundleFQIDJSON):
Expand Down Expand Up @@ -178,26 +191,48 @@ def _list_bundles(self,
validate_uuid_prefix(partition_prefix)
primary = BundleEntityType.primary.value
supplementary = BundleEntityType.supplementary.value
rows = self._run_sql(f'''
duos = BundleEntityType.duos.value
rows = list(self._run_sql(f'''
SELECT datarepo_row_id, {primary!r} AS entity_type
FROM {backtick(self._full_table_name(spec, primary))}
WHERE STARTS_WITH(datarepo_row_id, '{partition_prefix}')
UNION ALL
SELECT datarepo_row_id, {supplementary!r} AS entity_type
FROM {backtick(self._full_table_name(spec, supplementary))} AS supp
WHERE supp.is_supplementary AND STARTS_WITH(datarepo_row_id, '{partition_prefix}')
''')
return [
TDRAnvilBundleFQID(source=source,
# Reversibly tweak the entity UUID to prevent
# collisions between entity IDs and bundle IDs
uuid=uuids.change_version(row['datarepo_row_id'],
self.datarepo_row_uuid_version,
self.bundle_uuid_version),
version=self._version,
entity_type=BundleEntityType(row['entity_type']))
for row in rows
]
''' + (
''
if config.duos_service_url is None else
f'''
UNION ALL
SELECT datarepo_row_id, {duos!r} AS entity_type
FROM {backtick(self._full_table_name(spec, duos))}
'''
)))
bundles = []
dataset_count = 0
for row in rows:
# We intentionally omit the WHERE clause for datasets so that we can
# verify our assumption that each snapshot only contains rows for a
# single dataset. This verification is performed independently and
# concurrently for every partition, but only one partition actually
# emits the bundle.
if row['entity_type'] == duos:
require(0 == dataset_count)
dataset_count += 1
if not row['datarepo_row_id'].startswith(partition_prefix):
continue
bundles.append(TDRAnvilBundleFQID(
source=source,
# Reversibly tweak the entity UUID to prevent
# collisions between entity IDs and bundle IDs
uuid=uuids.change_version(row['datarepo_row_id'],
self.datarepo_row_uuid_version,
self.bundle_uuid_version),
version=self._version,
entity_type=BundleEntityType(row['entity_type'])
))
return bundles

def list_partitions(self,
source: TDRSourceRef
Expand Down Expand Up @@ -250,6 +285,10 @@ def _emulate_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
elif bundle_fqid.entity_type is BundleEntityType.supplementary:
log.info('Bundle %r is a supplementary bundle', bundle_fqid.uuid)
return self._supplementary_bundle(bundle_fqid)
elif bundle_fqid.entity_type is BundleEntityType.duos:
assert config.duos_service_url is not None, bundle_fqid
log.info('Bundle %r is a dataset description', bundle_fqid.uuid)
return self._dataset_description(bundle_fqid)
else:
assert False, bundle_fqid.entity_type

Expand Down Expand Up @@ -334,6 +373,19 @@ def _supplementary_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBund
result.add_links({Link(**link_args)}, entities_by_key)
return result

def _dataset_description(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
description = self.tdr.get_duos(bundle_fqid.source)['studyDescription']
entity_id = change_version(bundle_fqid.uuid,
self.bundle_uuid_version,
self.datarepo_row_uuid_version)
entity = EntityReference(entity_type=bundle_fqid.entity_type.value,
entity_id=entity_id)
bundle = TDRAnvilBundle(fqid=bundle_fqid)
bundle.add_entity(entity=entity,
version=self._version,
row={'description': description})
return bundle

def _bundle_entity(self, bundle_fqid: TDRAnvilBundleFQID) -> KeyReference:
source = bundle_fqid.source
bundle_uuid = bundle_fqid.uuid
Expand Down
10 changes: 10 additions & 0 deletions src/azul/terra.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,9 @@ def _job_info(self, job: QueryJob) -> JSON:
def _repository_endpoint(self, *path: str) -> mutable_furl:
return config.tdr_service_url.set(path=('api', 'repository', 'v1', *path))

def _duos_endpoint(self, *path: str) -> mutable_furl:
return config.duos_service_url.set(path=('api', *path))

def _check_response(self,
endpoint: furl,
response: urllib3.HTTPResponse
Expand Down Expand Up @@ -658,3 +661,10 @@ def for_registered_user(cls, authentication: OAuth2) -> 'TDRClient':

def drs_client(self) -> DRSClient:
return DRSClient(http_client=self._http_client)

def get_duos(self, source: SourceRef) -> MutableJSON:
response = self._retrieve_source(source)
duos_id = response['duosFirecloudGroup']['duosId']
url = self._duos_endpoint('dataset', 'registration', duos_id)
response = self._request('GET', url)
return self._check_response(url, response)
17 changes: 17 additions & 0 deletions test/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
from elasticsearch.helpers import (
scan,
)
from more_itertools import (
one,
)

from azul import (
CatalogName,
Expand All @@ -30,6 +33,7 @@
SourcedBundleFQID,
)
from azul.indexer.document import (
DocumentType,
IndexName,
)
from azul.indexer.index_service import (
Expand Down Expand Up @@ -137,9 +141,22 @@ def _get_all_hits(self):
index=','.join(self.index_service.index_names(self.catalog)),
preserve_order=True))
for hit in hits:
entity_type, doc_type = self._parse_index_name(hit)
if (
entity_type == 'datasets'
and doc_type is DocumentType.contribution
and 'description' in one(hit['_source']['contents']['datasets'])
):
# DUOS contributions contain no lists
continue
self._verify_sorted_lists(hit['_source'])
return hits

def _parse_index_name(self, hit) -> tuple[str, DocumentType]:
index_name = IndexName.parse(hit['_index'])
index_name.validate()
return index_name.entity_type, index_name.doc_type

def _load_canned_result(self, bundle_fqid: BundleFQID) -> MutableJSONs:
"""
Load the canned index documents for the given canned bundle and fix the
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2296f29

Please sign in to comment.