Skip to content

Commit

Permalink
Restructure HCA bundles (#6491, #4565, partial #6299, PR #6485)
Browse files Browse the repository at this point in the history
  • Loading branch information
achave11-ucsc committed Sep 3, 2024
2 parents ec4e3b1 + 4c70d10 commit 42cb07d
Show file tree
Hide file tree
Showing 124 changed files with 120,899 additions and 166,278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ def dss_bundle_to_tdr(bundle: Bundle, source: TDRSourceRef) -> TDRHCABundle:
links_entry = None
for entry in manifest:
entry['version'] = convert_version(entry['version'])
entry['is_stitched'] = False
if entry['name'] == 'links.json':
links_entry = entry
if entry['indexed']:
Expand Down
4 changes: 3 additions & 1 deletion src/azul/plugins/metadata/hca/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ def transformers(self,
api_bundle = api.Bundle(uuid=bundle.uuid,
version=bundle.version,
manifest=bundle.manifest,
metadata_files=bundle.metadata_files)
metadata=bundle.metadata,
links_json=bundle.links,
stitched_entity_ids=bundle.stitched)

def transformers():
for transformer_cls in self.transformer_types():
Expand Down
26 changes: 19 additions & 7 deletions src/azul/plugins/metadata/hca/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
from azul.types import (
JSON,
MutableJSON,
MutableJSONs,
)

log = logging.getLogger(__name__)


@attrs.define(kw_only=True)
class HCABundle(Bundle[BUNDLE_FQID], ABC):
manifest: MutableJSONs
manifest: MutableJSON
"""
Each item of the `manifest` attribute's value has this shape:
{
Expand All @@ -39,22 +38,35 @@ class HCABundle(Bundle[BUNDLE_FQID], ABC):
'version': '2019-05-16T162155.020000Z'
}
"""
metadata_files: MutableJSON
metadata: MutableJSON
links: MutableJSON
stitched: set[str] = attrs.field(factory=set)

def reject_joiner(self, catalog: CatalogName):
self._reject_joiner(self.manifest)
self._reject_joiner(self.metadata_files)
self._reject_joiner(self.metadata)
self._reject_joiner(self.links)

def to_json(self) -> MutableJSON:
return {
'manifest': self.manifest,
'metadata': self.metadata_files
'metadata': self.metadata,
'links': self.links,
'stitched': sorted(self.stitched)
}

@classmethod
def from_json(cls, fqid: BUNDLE_FQID, json_: JSON) -> 'Bundle':
manifest = json_['manifest']
metadata = json_['metadata']
assert isinstance(manifest, list), manifest
links = json_['links']
stitched = json_['stitched']
assert isinstance(manifest, dict), manifest
assert isinstance(metadata, dict), metadata
return cls(fqid=fqid, manifest=manifest, metadata_files=metadata)
assert isinstance(links, dict), links
assert isinstance(stitched, list), stitched
return cls(fqid=fqid,
manifest=manifest,
metadata=metadata,
links=links,
stitched=set(stitched))
8 changes: 4 additions & 4 deletions src/azul/plugins/metadata/hca/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ def _add_replica(self,
if not config.enable_replicas:
replica = None
elif self.entity_type() == 'bundles':
links = self.bundle.metadata_files['links.json']
links = self.bundle.links
replica = self._replica(links, entity_ref, hub_ids)
else:
assert isinstance(entity, api.Entity), entity
Expand Down Expand Up @@ -1428,7 +1428,7 @@ def entity_type(cls) -> str:
return 'files'

def _entities(self) -> Iterable[api.File]:
return api.not_stitched(self.api_bundle.files.values())
return self.api_bundle.not_stitched(self.api_bundle.files)

def _transform(self, files: Iterable[api.File]) -> Iterable[Contribution]:
zarr_stores: Mapping[str, list[api.File]] = self.group_zarrs(files)
Expand Down Expand Up @@ -1586,7 +1586,7 @@ def inner_entity_types(cls) -> frozenset[str]:

def _entities(self) -> Iterable[Sample]:
samples: dict[str, Sample] = dict()
for file in api.not_stitched(self.api_bundle.files.values()):
for file in self.api_bundle.not_stitched(self.api_bundle.files):
self._find_ancestor_samples(file, samples)
return samples.values()

Expand Down Expand Up @@ -1643,7 +1643,7 @@ def _singleton_entity(self) -> DatedEntity:
raise NotImplementedError

def _dated_entities(self) -> Iterable[DatedEntity]:
return api.not_stitched(self.api_bundle.entities.values())
return self.api_bundle.not_stitched(self.api_bundle.entities)

def estimate(self, partition: BundlePartition) -> int:
return int(partition.contains(self._singleton_id))
Expand Down
10 changes: 4 additions & 6 deletions src/azul/plugins/repository/canned/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from typing import (
Optional,
Type,
cast,
)

from furl import (
Expand Down Expand Up @@ -64,8 +63,6 @@
)
from azul.types import (
JSON,
MutableJSON,
MutableJSONs,
)
from azul.uuids import (
validate_uuid_prefix,
Expand Down Expand Up @@ -195,14 +192,15 @@ def fetch_bundle(self, bundle_fqid: CannedBundleFQID) -> CannedBundle:
self._assert_source(bundle_fqid.source)
now = time.time()
staging_area = self.staging_area(bundle_fqid.source.spec.name)
version, manifest, metadata = staging_area.get_bundle_parts(bundle_fqid.uuid)
version, manifest, metadata, links = staging_area.get_bundle_parts(bundle_fqid.uuid)
if bundle_fqid.version is None:
bundle_fqid = CannedBundleFQID(source=bundle_fqid.source,
uuid=bundle_fqid.uuid,
version=version)
bundle = CannedBundle(fqid=bundle_fqid,
manifest=cast(MutableJSONs, manifest),
metadata_files=cast(MutableJSON, metadata))
manifest=manifest,
metadata=metadata,
links=links)
assert version == bundle.version, (version, bundle)
log.info('It took %.003fs to download bundle %s.%s',
time.time() - now, bundle.uuid, bundle.version)
Expand Down
2 changes: 1 addition & 1 deletion src/azul/plugins/repository/dss/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def list_bundles(self,
def fetch_bundle(self, bundle_fqid: DSSBundleFQID) -> DSSBundle:
assert False, 'DSS is EOL'
# noinspection PyUnreachableCode
return DSSBundle(fqid=bundle_fqid, manifest=[], metadata_files={})
return DSSBundle(fqid=bundle_fqid, manifest={}, metadata={}, links={})

def dss_subscription_query(self, prefix: str) -> JSON:
return {
Expand Down
90 changes: 31 additions & 59 deletions src/azul/plugins/repository/tdr_hca/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,39 +184,29 @@ def canning_qualifier(cls) -> str:

def add_entity(self,
*,
entity_key: str,
entity_type: EntityType,
entity_row: BigQueryRow,
entity: EntityReference,
row: BigQueryRow,
is_stitched: bool
) -> None:
entity_id = entity_row[entity_type + '_id']
self._add_manifest_entry(name=entity_key,
uuid=entity_id,
version=TDRPlugin.format_version(entity_row['version']),
size=entity_row['content_size'],
content_type='application/json',
dcp_type=f'"metadata/{entity_row["schema_type"]}"',
is_stitched=is_stitched)
if entity_type.endswith('_file'):
descriptor = json.loads(entity_row['descriptor'])
self._add_manifest_entry(name=entity_row['file_name'],
if is_stitched:
self.stitched.add(entity.entity_id)
if entity.entity_type.endswith('_file'):
descriptor = json.loads(row['descriptor'])
self._add_manifest_entry(entity,
name=row['file_name'],
uuid=descriptor['file_id'],
version=descriptor['file_version'],
size=descriptor['size'],
content_type=descriptor['content_type'],
dcp_type='data',
is_stitched=is_stitched,
checksums=Checksums.from_json(descriptor),
drs_uri=self._parse_drs_uri(entity_row['file_id'], descriptor))
content = entity_row['content']
self.metadata_files[entity_key] = (json.loads(content)
if isinstance(content, str)
else content)
drs_uri=self._parse_drs_uri(row['file_id'], descriptor))
content = row['content']
self.metadata[str(entity)] = (json.loads(content)
if isinstance(content, str)
else content)

metadata_columns: ClassVar[set[str]] = {
'version',
'JSON_EXTRACT_SCALAR(content, "$.schema_type") AS schema_type',
'BYTE_LENGTH(content) AS content_size',
'content'
}

Expand All @@ -235,28 +225,22 @@ def add_entity(self,
_suffix = 'tdr.'

def _add_manifest_entry(self,
entity: EntityReference,
*,
name: str,
uuid: str,
version: str,
size: int,
content_type: str,
dcp_type: str,
is_stitched: bool,
checksums: Optional[Checksums] = None,
drs_uri: Optional[str] = None) -> None:
# These requirements prevent mismatches in the DRS domain, and ensure
# that changes to the column syntax don't go undetected.
if drs_uri is not None:
parsed = RegularDRSURI.parse(drs_uri)
require(parsed.uri.netloc == config.tdr_service_url.netloc)
self.manifest.append({
self.manifest[str(entity)] = {
'name': name,
'uuid': uuid,
'version': version,
'content-type': f'{content_type}; dcp-type={dcp_type}',
'size': size,
'is_stitched': is_stitched,
**(
{
'indexed': True,
Expand All @@ -268,7 +252,7 @@ def _add_manifest_entry(self,
**checksums.to_json()
}
)
})
}

def _parse_drs_uri(self,
file_id: Optional[str],
Expand All @@ -288,6 +272,10 @@ def _parse_drs_uri(self,
external_drs_uri = None
return external_drs_uri
else:
# This requirement prevent mismatches in the DRS domain, and ensures
# that changes to the column syntax don't go undetected.
parsed = RegularDRSURI.parse(file_id)
require(parsed.uri.netloc == config.tdr_service_url.netloc)
return file_id


Expand Down Expand Up @@ -340,13 +328,11 @@ def _query_unique_sorted(self,

def _emulate_bundle(self, bundle_fqid: TDRBundleFQID) -> TDRHCABundle:
bundle = TDRHCABundle(fqid=bundle_fqid,
manifest=[],
metadata_files={})
manifest={},
metadata={},
links={})
entities, root_entities, links_jsons = self._stitch_bundles(bundle)
bundle.add_entity(entity_key='links.json',
entity_type='links',
entity_row=self._merge_links(links_jsons),
is_stitched=False)
bundle.links = self._merge_links(links_jsons)

with ThreadPoolExecutor(max_workers=config.num_tdr_workers) as executor:
futures = {
Expand All @@ -362,18 +348,16 @@ def _emulate_bundle(self, bundle_fqid: TDRBundleFQID) -> TDRHCABundle:
rows = future.result()
pk_column = entity_type + '_id'
rows.sort(key=itemgetter(pk_column))
for i, row in enumerate(rows):
is_stitched = EntityReference(entity_id=row[pk_column],
entity_type=entity_type) not in root_entities
bundle.add_entity(entity_key=f'{entity_type}_{i}.json',
entity_type=entity_type,
entity_row=row,
for row in rows:
entity = EntityReference(entity_id=row[pk_column], entity_type=entity_type)
is_stitched = entity not in root_entities
bundle.add_entity(entity=entity,
row=row,
is_stitched=is_stitched)
else:
log.error('TDR worker failed to retrieve entities of type %r',
entity_type, exc_info=e)
raise e
bundle.manifest.sort(key=itemgetter('uuid'))
return bundle

def _stitch_bundles(self,
Expand Down Expand Up @@ -470,7 +454,6 @@ def _retrieve_entities(self,
else TDRHCABundle.data_columns if entity_type.endswith('_file')
else TDRHCABundle.metadata_columns
)
assert version_column in non_pk_columns
table_name = backtick(self._full_table_name(source, entity_type))
entity_id_type = one(set(map(type, entity_ids)))

Expand Down Expand Up @@ -555,12 +538,6 @@ def _merge_links(self, links_jsons: JSONs) -> JSON:
"""
root, *stitched = links_jsons
if stitched:
merged = {
'links_id': root['links_id'],
'version': root['version']
}
for common_key in ('project_id', 'schema_type'):
merged[common_key] = one({row[common_key] for row in links_jsons})
source_contents = [row['content'] for row in links_jsons]
# FIXME: Explicitly verify compatible schema versions for stitched subgraphs
# https://github.com/DataBiosphere/azul/issues/3215
Expand All @@ -574,14 +551,9 @@ def _merge_links(self, links_jsons: JSONs) -> JSON:
'describedBy': str(schema_url),
'links': sum((sc['links'] for sc in source_contents), start=[])
}
merged['content'] = merged_content # Keep result of parsed JSON for reuse
merged['content_size'] = len(json.dumps(merged_content))
assert merged.keys() == one({
frozenset(row.keys()) for row in links_jsons
}), merged
assert merged_content.keys() == one({
frozenset(sc.keys()) for sc in source_contents
}), merged_content
return merged
return merged_content
else:
return root
return root['content']
5 changes: 1 addition & 4 deletions src/humancellatlas/data/metadata/age_range.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from dataclasses import (
dataclass,
)
from typing import (
Optional,
)


@dataclass(frozen=True)
Expand Down Expand Up @@ -89,7 +86,7 @@ def fail():
else:
raise fail() from e1

def cvt(value: str, default: float) -> Optional[float]:
def cvt(value: str, default: float) -> float | None:
assert isinstance(default, float)
try:
return factor * float(value) if value else default
Expand Down
Loading

0 comments on commit 42cb07d

Please sign in to comment.