Skip to content

Commit

Permalink
[a r] Index dataset description from Terra API (#5547)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Oct 27, 2023
1 parent 33ad940 commit 674ed45
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 49 deletions.
1 change: 1 addition & 0 deletions deployments/anvilbox/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def env() -> Mapping[str, Optional[str]]:
'AZUL_TDR_SOURCE_LOCATION': 'us-central1',
'AZUL_TDR_SERVICE_URL': 'https://jade.datarepo-dev.broadinstitute.org',
'AZUL_SAM_SERVICE_URL': 'https://sam.dsde-dev.broadinstitute.org',
'AZUL_DUOS_SERVICE_URL': 'https://consent.dsde-dev.broadinstitute.org',

**(
{
Expand Down
1 change: 1 addition & 0 deletions deployments/anvildev/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def env() -> Mapping[str, Optional[str]]:
'AZUL_TDR_SOURCE_LOCATION': 'us-central1',
'AZUL_TDR_SERVICE_URL': 'https://jade.datarepo-dev.broadinstitute.org',
'AZUL_SAM_SERVICE_URL': 'https://sam.dsde-dev.broadinstitute.org',
'AZUL_DUOS_SERVICE_URL': 'https://consent.dsde-dev.broadinstitute.org',

'AZUL_ENABLE_MONITORING': '1',

Expand Down
2 changes: 1 addition & 1 deletion lambdas/service/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@
# changes and reset the minor version to zero. Otherwise, increment only
# the minor version for backwards compatible changes. A backwards
# compatible change is one that does not require updates to clients.
'version': '2.0'
'version': '2.1'
},
'tags': [
{
Expand Down
4 changes: 4 additions & 0 deletions src/azul/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ def tdr_service_url(self) -> mutable_furl:
def sam_service_url(self) -> mutable_furl:
return furl(self.environ['AZUL_SAM_SERVICE_URL'])

@property
def duos_service_url(self) -> mutable_furl:
return furl(self.environ['AZUL_DUOS_SERVICE_URL'])

@property
def dss_query_prefix(self) -> str:
return self.environ.get('AZUL_DSS_QUERY_PREFIX', '')
Expand Down
25 changes: 16 additions & 9 deletions src/azul/indexer/index_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
config,
freeze,
)
from azul.collections import (
deep_dict_merge,
)
from azul.deployment import (
aws,
)
Expand Down Expand Up @@ -717,15 +720,19 @@ def _select_latest(self,
cur_bundle_uuid, cur_bundle_version, cur_entity = \
collated_entities.get(entity_id, (None, '', None))
if cur_entity is not None and entity.keys() != cur_entity.keys():
symmetric_difference = set(entity.keys()).symmetric_difference(cur_entity)
log.warning('Document shape of `%s` entity `%s` does not match between bundles '
'%s, version %s and %s, version %s: %s',
entity_type, entity_id,
cur_bundle_uuid, cur_bundle_version,
contribution.coordinates.bundle.uuid,
contribution.coordinates.bundle.version,
symmetric_difference)
if cur_bundle_version < contribution.coordinates.bundle.version:
if cur_bundle_version == contribution.coordinates.bundle.version:
assert contribution.entity.entity_type == 'datasets', contribution
entity = deep_dict_merge((entity, cur_entity))
else:
symmetric_difference = set(entity.keys()).symmetric_difference(cur_entity)
log.warning('Document shape of `%s` entity `%s` does not match between bundles '
'%s, version %s and %s, version %s: %s',
entity_type, entity_id,
cur_bundle_uuid, cur_bundle_version,
contribution.coordinates.bundle.uuid,
contribution.coordinates.bundle.version,
symmetric_difference)
if cur_bundle_version <= contribution.coordinates.bundle.version:
collated_entities[entity_id] = (
contribution.coordinates.bundle.uuid,
contribution.coordinates.bundle.version,
Expand Down
39 changes: 27 additions & 12 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def field_types(cls) -> FieldTypes:
return {
'activities': cls._activity_types(),
'biosamples': cls._biosample_types(),
'datasets': cls._dataset_types(),
'datasets': {**cls._dataset_types(), **cls._sparse_dataset_types()},
'diagnoses': cls._diagnosis_types(),
'donors': cls._donor_types(),
'files': cls._aggregate_file_types(),
Expand Down Expand Up @@ -221,6 +221,13 @@ def _biosample_types(cls) -> FieldTypes:
'donor_age_at_collection': pass_thru_json,
}

@classmethod
def _sparse_dataset_types(cls) -> FieldTypes:
return {
'document_id': null_str,
'description': null_str,
}

@classmethod
def _dataset_types(cls) -> FieldTypes:
return {
Expand Down Expand Up @@ -370,6 +377,9 @@ def _biosample(self, biosample: EntityReference) -> MutableJSON:
def _dataset(self, dataset: EntityReference) -> MutableJSON:
return self._entity(dataset, self._dataset_types())

def _sparse_dataset(self, dataset: EntityReference) -> MutableJSON:
return self._entity(dataset, self._sparse_dataset_types())

def _diagnosis(self, diagnosis: EntityReference) -> MutableJSON:
return self._entity(diagnosis,
self._diagnosis_types(),
Expand Down Expand Up @@ -446,17 +456,22 @@ def entity_type(cls) -> str:
return 'datasets'

def _transform(self, entity: EntityReference) -> Contribution:
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
self._entities_by_type[activity_type]
for activity_type in self._activity_polymorphic_types
)),
biosamples=self._entities(self._biosample, self._entities_by_type['biosample']),
datasets=[self._dataset(entity)],
diagnoses=self._entities(self._diagnosis, self._entities_by_type['diagnosis']),
donors=self._entities(self._donor, self._entities_by_type['donor']),
files=self._entities(self._file, self._entities_by_type['file']),
)
try:
dataset = self._dataset(entity)
except KeyError:
contents = dict(datasets=[self._sparse_dataset(entity)])
else:
contents = dict(
activities=self._entities(self._activity, chain.from_iterable(
self._entities_by_type[activity_type]
for activity_type in self._activity_polymorphic_types
)),
biosamples=self._entities(self._biosample, self._entities_by_type['biosample']),
datasets=[dataset],
diagnoses=self._entities(self._diagnosis, self._entities_by_type['diagnosis']),
donors=self._entities(self._donor, self._entities_by_type['donor']),
files=self._entities(self._file, self._entities_by_type['file']),
)
return self._contribution(contents, entity)


Expand Down
72 changes: 59 additions & 13 deletions src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
MutableJSONs,
)
from azul.uuids import (
change_version,
validate_uuid_prefix,
)

Expand Down Expand Up @@ -98,9 +99,21 @@ class BundleEntityType(Enum):
specifically tailored traversal implementation. Supplementary bundles always
consist of exactly two entities: one file (the bundle entity) and one
dataset.
The `dataset.description` field is unusual in that it is not stored in
BigQuery and must be retrieved via Terra's DUOS API. There is only one
dataset per snapshot, which is referenced in all primary and supplementary
bundles. Therefore, only one request to DUOS per *snapshot* is necessary,
but if `description` is retrieved at the same time as the other dataset
fields, we will make one request per *bundle* instead, potentially
overloading the DUOS service. Our solution is to retrieve `description` only
in a dedicated bundle format, once per snapshot, and merge it with the other
dataset fields during aggregation. This bundle contains only a single
dataset entity with only the `description` field populated.
"""
primary: EntityType = 'biosample'
supplementary: EntityType = 'file'
dataset: EntityType = 'dataset'


class AnvilBundleFQIDJSON(SourcedBundleFQIDJSON):
Expand Down Expand Up @@ -178,26 +191,43 @@ def _list_bundles(self,
validate_uuid_prefix(partition_prefix)
primary = BundleEntityType.primary.value
supplementary = BundleEntityType.supplementary.value
rows = self._run_sql(f'''
dataset = BundleEntityType.dataset.value
rows = list(self._run_sql(f'''
SELECT datarepo_row_id, {primary!r} AS entity_type
FROM {backtick(self._full_table_name(spec, primary))}
WHERE STARTS_WITH(datarepo_row_id, '{partition_prefix}')
UNION ALL
SELECT datarepo_row_id, {supplementary!r} AS entity_type
FROM {backtick(self._full_table_name(spec, supplementary))} AS supp
WHERE supp.is_supplementary AND STARTS_WITH(datarepo_row_id, '{partition_prefix}')
''')
return [
AnvilBundleFQID(source=source,
# Reversibly tweak the entity UUID to prevent
# collisions between entity IDs and bundle IDs
uuid=uuids.change_version(row['datarepo_row_id'],
self.datarepo_row_uuid_version,
self.bundle_uuid_version),
version=self._version,
entity_type=BundleEntityType(row['entity_type']))
for row in rows
]
UNION ALL
SELECT datarepo_row_id, {dataset!r} AS entity_type
FROM {backtick(self._full_table_name(spec, dataset))}
'''))
bundles = []
dataset_count = 0
for row in rows:
# We intentionally omit the WHERE clause for datasets so that we can
# verify our assumption that each snapshot only contains rows for a
# single dataset. This verification is performed independently and
# concurrently for every partition, but only one partition actually
# emits the bundle.
if row['entity_type'] == dataset:
require(0 == dataset_count)
dataset_count += 1
if not row['datarepo_row_id'].startswith(partition_prefix):
continue
bundles.append(AnvilBundleFQID(
source=source,
# Reversibly tweak the entity UUID to prevent
# collisions between entity IDs and bundle IDs
uuid=uuids.change_version(row['datarepo_row_id'],
self.datarepo_row_uuid_version,
self.bundle_uuid_version),
version=self._version,
entity_type=BundleEntityType(row['entity_type'])
))
return bundles

def list_partitions(self,
source: TDRSourceRef
Expand Down Expand Up @@ -250,6 +280,9 @@ def _emulate_bundle(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle:
elif bundle_fqid.entity_type is BundleEntityType.supplementary:
log.info('Bundle %r is a supplementary bundle', bundle_fqid.uuid)
return self._supplementary_bundle(bundle_fqid)
elif bundle_fqid.entity_type is BundleEntityType.dataset:
log.info('Bundle %r is a dataset description', bundle_fqid.uuid)
return self._dataset_description(bundle_fqid)
else:
assert False, bundle_fqid.entity_type

Expand Down Expand Up @@ -334,6 +367,19 @@ def _supplementary_bundle(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle:
result.add_links({Link(**link_args)}, entities_by_key)
return result

def _dataset_description(self, bundle_fqid: AnvilBundleFQID) -> TDRAnvilBundle:
description = self.tdr.get_duos(bundle_fqid.source)['studyDescription']
entity_id = change_version(bundle_fqid.uuid,
self.bundle_uuid_version,
self.datarepo_row_uuid_version)
entity = EntityReference(entity_type=bundle_fqid.entity_type.value,
entity_id=entity_id)
bundle = TDRAnvilBundle(fqid=bundle_fqid)
bundle.add_entity(entity=entity,
version=self._version,
row={'description': description})
return bundle

def _bundle_entity(self, bundle_fqid: AnvilBundleFQID) -> KeyReference:
source = bundle_fqid.source
bundle_uuid = bundle_fqid.uuid
Expand Down
10 changes: 10 additions & 0 deletions src/azul/terra.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,9 @@ def _job_info(self, job: QueryJob) -> JSON:
def _repository_endpoint(self, *path: str) -> mutable_furl:
return config.tdr_service_url.set(path=('api', 'repository', 'v1', *path))

def _duos_endpoint(self, *path: str) -> mutable_furl:
return config.duos_service_url.set(path=('api', *path))

def _check_response(self,
endpoint: furl,
response: urllib3.HTTPResponse
Expand Down Expand Up @@ -659,3 +662,10 @@ def for_registered_user(cls, authentication: OAuth2) -> 'TDRClient':

def drs_client(self) -> DRSClient:
return DRSClient(http_client=self._http_client)

def get_duos(self, source: SourceRef) -> MutableJSON:
response = self._retrieve_source(source)
duos_id = response['duosFirecloudGroup']['duosId']
url = self._duos_endpoint('dataset', 'registration', duos_id)
response = self._request('GET', url)
return self._check_response(url, response)
17 changes: 17 additions & 0 deletions test/indexer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from elasticsearch.helpers import (
scan,
)
from more_itertools import (
one,
)

from azul import (
CatalogName,
Expand All @@ -26,6 +29,7 @@
SourcedBundleFQID,
)
from azul.indexer.document import (
DocumentType,
IndexName,
)
from azul.indexer.index_service import (
Expand Down Expand Up @@ -133,9 +137,22 @@ def _get_all_hits(self):
index=','.join(self.index_service.index_names(self.catalog)),
preserve_order=True))
for hit in hits:
entity_type, doc_type = self._parse_index_name(hit)
if (
entity_type == 'datasets'
and doc_type is DocumentType.contribution
and 'description' in one(hit['_source']['contents']['datasets'])
):
# Sparse dataset contributions contain no lists
continue
self._verify_sorted_lists(hit['_source'])
return hits

def _parse_index_name(self, hit) -> tuple[str, DocumentType]:
index_name = IndexName.parse(hit['_index'])
index_name.validate()
return index_name.entity_type, index_name.doc_type

def _load_canned_result(self, bundle_fqid: BundleFQID) -> MutableJSONs:
"""
Load the canned index documents for the given canned bundle and fix the
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 674ed45

Please sign in to comment.