From 118e0560965696700bf3e128c80b0cc702d79393 Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Fri, 15 Mar 2024 20:45:34 -0700 Subject: [PATCH 1/4] Revert "[r] Track replica hub IDs (#5360, PR #5987)" This reverts commit 82110e816896d6d08e9454aa7ab39e32f02c05fc, reversing changes made to 63c51a560afc23a252725610c2f6c2db3952dc3c. --- environment.py | 4 -- src/azul/__init__.py | 4 -- src/azul/indexer/document.py | 43 ++++----------- src/azul/indexer/index_controller.py | 12 ++-- src/azul/indexer/index_service.py | 49 +++++++++-------- src/azul/indexer/transform.py | 8 +-- .../metadata/anvil/indexer/transform.py | 55 ++++--------------- .../plugins/metadata/hca/indexer/transform.py | 32 ++--------- test/indexer/__init__.py | 7 +-- ...2-e274-affe-aabc-eb3db63ad068.results.json | 36 +++--------- ...d.2018-11-02T11:33:44.698028Z.results.json | 18 ++---- test/indexer/test_indexer.py | 48 ++++------------ 12 files changed, 85 insertions(+), 231 deletions(-) diff --git a/environment.py b/environment.py index f888a1b8e..637c30704 100644 --- a/environment.py +++ b/environment.py @@ -187,10 +187,6 @@ def env() -> Mapping[str, Optional[str]]: # Whether to create and populate an index for replica documents. 'AZUL_ENABLE_REPLICAS': '1', - # Maximum number of conflicts to allow before giving when writing - # replica documents. - 'AZUL_REPLICA_CONFLICT_LIMIT': '10', - # The name of the current deployment. This variable controls the name of # all cloud resources and is the main vehicle for isolating cloud # resources between deployments. diff --git a/src/azul/__init__.py b/src/azul/__init__.py index 563a5e250..963b6b3d9 100644 --- a/src/azul/__init__.py +++ b/src/azul/__init__.py @@ -765,10 +765,6 @@ def es_volume_size(self) -> int: def enable_replicas(self) -> bool: return self._boolean(self.environ['AZUL_ENABLE_REPLICAS']) - @property - def replica_conflict_limit(self) -> int: - return int(self.environ['AZUL_REPLICA_CONFLICT_LIMIT']) - # Because this property is relatively expensive to produce and frequently # used we are applying aggressive caching here, knowing very well that # this eliminates the option to reconfigure the running process by diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index bf06473db..ddfb7bb8f 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1100,10 +1100,6 @@ class OpType(Enum): #: Remove the document from the index or fail if it does not exist delete = auto() - #: Modify a document in the index via a scripted update or create it if it - #: does not exist - update = auto() - C = TypeVar('C', bound=DocumentCoordinates) @@ -1330,7 +1326,11 @@ def to_index(self, if op_type is OpType.delete else { '_source' if bulk else 'body': - self._body(field_types[coordinates.entity.catalog]) + self.translate_fields(doc=self.to_json(), + field_types=field_types[coordinates.entity.catalog], + forward=True) + if self.needs_translation else + self.to_json() } ), '_id' if bulk else 'id': self.coordinates.document_id @@ -1361,14 +1361,6 @@ def to_index(self, def op_type(self) -> OpType: raise NotImplementedError - def _body(self, field_types: FieldTypes) -> JSON: - body = self.to_json() - if self.needs_translation: - body = self.translate_fields(doc=body, - field_types=field_types, - forward=True) - return body - class DocumentSource(SourceRef[SimpleSourceSpec, SourceRef]): pass @@ -1545,6 +1537,10 @@ class Replica(Document[ReplicaCoordinates[E]]): hub_ids: list[EntityID] + #: The version_type attribute will change to VersionType.none if writing + #: to Elasticsearch fails with 409 + version_type: VersionType = VersionType.create_only + needs_translation: ClassVar[bool] = False def __attrs_post_init__(self): @@ -1559,28 +1555,11 @@ def field_types(cls, field_types: FieldTypes) -> FieldTypes: def to_json(self) -> JSON: return dict(super().to_json(), replica_type=self.replica_type, - # Ensure that index contents is deterministic for unit tests - hub_ids=sorted(set(self.hub_ids))) + hub_ids=self.hub_ids) @property def op_type(self) -> OpType: - assert self.version_type is VersionType.none, self.version_type - return OpType.update - - def _body(self, field_types: FieldTypes) -> JSON: - return { - 'script': { - 'source': ''' - Stream stream = Stream.concat(ctx._source.hub_ids.stream(), - params.hub_ids.stream()); - ctx._source.hub_ids = stream.sorted().distinct().collect(Collectors.toList()); - ''', - 'params': { - 'hub_ids': self.hub_ids - } - }, - 'upsert': super()._body(field_types) - } + return OpType.create CataloguedContribution = Contribution[CataloguedEntityReference] diff --git a/src/azul/indexer/index_controller.py b/src/azul/indexer/index_controller.py index 0c9da8bcf..7536ce9bd 100644 --- a/src/azul/indexer/index_controller.py +++ b/src/azul/indexer/index_controller.py @@ -185,14 +185,10 @@ def contribute(self, event: Iterable[SQSRecord], *, retry=False): for entity, num_contributions in tallies.items()] if replicas: - if delete: - # FIXME: Replica index does not support deletions - # https://github.com/DataBiosphere/azul/issues/5846 - log.warning('Deletion of replicas is not supported') - else: - log.info('Writing %i replicas to index.', len(replicas)) - num_written = self.index_service.replicate(catalog, replicas) - log.info('Successfully wrote %i replicas', num_written) + log.info('Writing %i replicas to index.', len(replicas)) + num_written, num_present = self.index_service.replicate(catalog, replicas) + log.info('Successfully wrote %i replicas; %i were already present', + num_written, num_present) else: log.info('No replicas to write.') diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index 3ad90edba..d7553e9ba 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -227,8 +227,7 @@ def delete(self, catalog: CatalogName, bundle: Bundle) -> None: # number of contributions per bundle. # https://github.com/DataBiosphere/azul/issues/610 tallies.update(self.contribute(catalog, contributions)) - # FIXME: Replica index does not support deletions - # https://github.com/DataBiosphere/azul/issues/5846 + self.replicate(catalog, replicas) self.aggregate(tallies) def deep_transform(self, @@ -418,7 +417,7 @@ def contribute(self, with a count of 0 may exist. This is ok. See description of aggregate(). """ tallies = Counter() - writer = self._create_writer(DocumentType.contribution, catalog) + writer = self._create_writer(catalog) while contributions: writer.write(contributions) retry_contributions = [] @@ -452,7 +451,7 @@ def aggregate(self, tallies: CataloguedTallies): catalogs. """ # Use catalog specified in each tally - writer = self._create_writer(DocumentType.aggregate, catalog=None) + writer = self._create_writer(catalog=None) while True: # Read the aggregates old_aggregates = self._read_aggregates(tallies) @@ -506,24 +505,33 @@ def replicate(self, catalog: CatalogName, replicas: list[Replica] ) -> tuple[int, int]: - writer = self._create_writer(DocumentType.replica, catalog) + writer = self._create_writer(catalog) num_replicas = len(replicas) - num_written = 0 + num_written, num_present = 0, 0 while replicas: writer.write(replicas) retry_replicas = [] for r in replicas: if r.coordinates in writer.retries: - retry_replicas.append(r) + conflicts = writer.conflicts[r.coordinates] + if conflicts == 0: + retry_replicas.append(r) + elif conflicts == 1: + # FIXME: Track replica hub IDs + # https://github.com/DataBiosphere/azul/issues/5360 + writer.conflicts.pop(r.coordinates) + num_present += 1 + else: + assert False, (conflicts, r.coordinates) else: num_written += 1 replicas = retry_replicas writer.raise_on_errors() - assert num_written == num_replicas, ( - num_written, num_replicas + assert num_written + num_present == num_replicas, ( + num_written, num_present, num_replicas ) - return num_written + return num_written, num_present def _read_aggregates(self, entities: CataloguedTallies @@ -782,25 +790,16 @@ def _reconcile(self, for entity_type, entities in result.items() } - def _create_writer(self, - doc_type: DocumentType, - catalog: Optional[CatalogName] - ) -> 'IndexWriter': + def _create_writer(self, catalog: Optional[CatalogName]) -> 'IndexWriter': # We allow one conflict retry in the case of duplicate notifications and # switch from 'add' to 'update'. After that, there should be no - # conflicts because we use an SQS FIFO message group per entity. - # Conflicts are common when writing replicas due to entities being - # shared between bundles. For other errors we use SQS message redelivery - # to take care of the retries. - limits = { - DocumentType.contribution: 1, - DocumentType.aggregate: 1, - DocumentType.replica: config.replica_conflict_limit - } + # conflicts because we use an SQS FIFO message group per entity. For + # other errors we use SQS message redelivery to take care of the + # retries. return IndexWriter(catalog, self.catalogued_field_types(), refresh=False, - conflict_retry_limit=limits[doc_type], + conflict_retry_limit=1, error_retry_limit=0) @@ -856,6 +855,8 @@ def write(self, documents: list[Document]): def _write_individually(self, documents: Iterable[Document]): log.info('Writing documents individually') for doc in documents: + if isinstance(doc, Replica): + assert doc.version_type is VersionType.create_only, doc try: method = getattr(self.es_client, doc.op_type.name) method(refresh=self.refresh, **doc.to_index(self.catalog, self.field_types)) diff --git a/src/azul/indexer/transform.py b/src/azul/indexer/transform.py index cd8ab6fae..1a9db1c86 100644 --- a/src/azul/indexer/transform.py +++ b/src/azul/indexer/transform.py @@ -125,18 +125,14 @@ def _contribution(self, source=self.bundle.fqid.source, contents=contents) - def _replica(self, - contents: MutableJSON, - entity: EntityReference, - hub_ids: list[EntityID] - ) -> Replica: + def _replica(self, contents: MutableJSON, entity: EntityReference) -> Replica: coordinates = ReplicaCoordinates(content_hash=json_hash(contents).hexdigest(), entity=entity) return Replica(coordinates=coordinates, version=None, replica_type=self.replica_type(entity), contents=contents, - hub_ids=hub_ids) + hub_ids=[]) @classmethod @abstractmethod diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py index 5a62b64b3..106eadfde 100644 --- a/src/azul/plugins/metadata/anvil/indexer/transform.py +++ b/src/azul/plugins/metadata/anvil/indexer/transform.py @@ -186,14 +186,11 @@ def _transform(self, entity: EntityReference) -> Transform: def _add_replica(self, contribution: Optional[MutableJSON], entity: EntityReference, - hub_ids: list[EntityID] ) -> Transform: no_replica = not config.enable_replicas or self.entity_type() == 'bundles' return ( None if contribution is None else self._contribution(contribution, entity), - None if no_replica else self._replica(self.bundle.entities[entity], - entity, - hub_ids) + None if no_replica else self._replica(self.bundle.entities[entity], entity) ) def _pluralize(self, entity_type: str) -> str: @@ -477,8 +474,8 @@ def _complete_dataset_keys(cls) -> AbstractSet[str]: class SingletonTransformer(BaseTransformer, metaclass=ABCMeta): - def _contents(self) -> MutableJSON: - return dict( + def _transform(self, entity: EntityReference) -> Transform: + contents = dict( activities=self._entities(self._activity, chain.from_iterable( self._entities_by_type[activity_type] for activity_type in self._activity_polymorphic_types @@ -489,6 +486,7 @@ def _contents(self) -> MutableJSON: donors=self._entities(self._donor, self._entities_by_type['donor']), files=self._entities(self._file, self._entities_by_type['file']) ) + return self._add_replica(contents, entity) @classmethod def field_types(cls) -> FieldTypes: @@ -529,17 +527,15 @@ def entity_type(cls) -> str: def _transform(self, entity: EntityReference) -> Transform: linked = self._linked_entities(entity) - files = linked['file'] contents = dict( activities=[self._activity(entity)], biosamples=self._entities(self._biosample, linked['biosample']), datasets=[self._dataset(self._only_dataset())], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=self._entities(self._donor, linked['donor']), - files=self._entities(self._file, files), + files=self._entities(self._file, linked['file']), ) - hub_ids = [f.entity_id for f in files] - return self._add_replica(contents, entity, hub_ids) + return self._add_replica(contents, entity) class BiosampleTransformer(BaseTransformer): @@ -550,7 +546,6 @@ def entity_type(cls) -> str: def _transform(self, entity: EntityReference) -> Transform: linked = self._linked_entities(entity) - files = linked['file'] contents = dict( activities=self._entities(self._activity, chain.from_iterable( linked[activity_type] @@ -560,18 +555,15 @@ def _transform(self, entity: EntityReference) -> Transform: datasets=[self._dataset(self._only_dataset())], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=self._entities(self._donor, linked['donor']), - files=self._entities(self._file, files), + files=self._entities(self._file, linked['file']), ) - hub_ids = [f.entity_id for f in files] - return self._add_replica(contents, entity, hub_ids) + return self._add_replica(contents, entity) class DiagnosisTransformer(BaseTransformer): def _transform(self, entity: EntityReference) -> Transform: - files = self._linked_entities(entity)['file'] - hub_ids = [f.entity_id for f in files] - return self._add_replica(None, entity, hub_ids) + return self._add_replica(None, entity) @classmethod def entity_type(cls) -> EntityType: @@ -588,11 +580,6 @@ def _singleton(self) -> EntityReference: return EntityReference(entity_type='bundle', entity_id=self.bundle.uuid) - def _transform(self, entity: EntityReference) -> Transform: - contents = self._contents() - hub_ids = [f.entity_id for f in self._entities_by_type['file']] - return self._add_replica(contents, entity, hub_ids) - class DatasetTransformer(SingletonTransformer): @@ -603,18 +590,6 @@ def entity_type(cls) -> str: def _singleton(self) -> EntityReference: return self._only_dataset() - def _transform(self, entity: EntityReference) -> Transform: - contents = self._contents() - # Every file in a snapshot is linked to that snapshot's singular - # dataset, making an explicit list of hub IDs for the dataset both - # redundant and impractically large (we observe that for large - # snapshots, trying to track this many files in a single data structure - # causes a prohibitively high rate of conflicts during replica updates). - # Therefore, we leave the hub IDs field empty for datasets and rely on - # the tenet that every file is an implicit hub of its parent dataset. - hub_ids = [] - return self._add_replica(contents, entity, hub_ids) - class DonorTransformer(BaseTransformer): @@ -624,7 +599,6 @@ def entity_type(cls) -> str: def _transform(self, entity: EntityReference) -> Transform: linked = self._linked_entities(entity) - files = linked['file'] contents = dict( activities=self._entities(self._activity, chain.from_iterable( linked[activity_type] @@ -634,10 +608,9 @@ def _transform(self, entity: EntityReference) -> Transform: datasets=[self._dataset(self._only_dataset())], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=[self._donor(entity)], - files=self._entities(self._file, files), + files=self._entities(self._file, linked['file']), ) - hub_ids = [f.entity_id for f in files] - return self._add_replica(contents, entity, hub_ids) + return self._add_replica(contents, entity) class FileTransformer(BaseTransformer): @@ -659,8 +632,4 @@ def _transform(self, entity: EntityReference) -> Transform: donors=self._entities(self._donor, linked['donor']), files=[self._file(entity)], ) - # The result of the link traversal does not include the starting entity, - # so without this step the file itself wouldn't be included in its hubs - files = (entity, *linked['file']) - hub_ids = [f.entity_id for f in files] - return self._add_replica(contents, entity, hub_ids) + return self._add_replica(contents, entity) diff --git a/src/azul/plugins/metadata/hca/indexer/transform.py b/src/azul/plugins/metadata/hca/indexer/transform.py index ab1183453..3dbed2c1d 100644 --- a/src/azul/plugins/metadata/hca/indexer/transform.py +++ b/src/azul/plugins/metadata/hca/indexer/transform.py @@ -503,8 +503,7 @@ def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]: def _add_replica(self, contribution: MutableJSON, - entity: Union[api.Entity, DatedEntity], - hub_ids: list[EntityID] + entity: Union[api.Entity, DatedEntity] ) -> Transform: entity_ref = EntityReference(entity_id=str(entity.document_id), entity_type=self.entity_type()) @@ -512,7 +511,7 @@ def _add_replica(self, replica = None else: assert isinstance(entity, api.Entity), entity - replica = self._replica(entity.json, entity_ref, hub_ids) + replica = self._replica(entity.json, entity_ref) return self._contribution(contribution, entity_ref), replica def _find_ancestor_samples(self, @@ -1466,8 +1465,7 @@ def _transform(self, files: Iterable[api.File]) -> Iterable[Contribution]: additional_contents = self.matrix_stratification_values(file) for entity_type, values in additional_contents.items(): contents[entity_type].extend(values) - hub_ids = list(map(str, visitor.files)) - yield self._add_replica(contents, file, hub_ids) + yield self._add_replica(contents, file) def matrix_stratification_values(self, file: api.File) -> JSON: """ @@ -1562,8 +1560,7 @@ def _transform(self, ), dates=[self._date(cell_suspension)], projects=[self._project(self._api_project)]) - hub_ids = list(map(str, visitor.files)) - yield self._add_replica(contents, cell_suspension, hub_ids) + yield self._add_replica(contents, cell_suspension) class SampleTransformer(PartitionedTransformer): @@ -1608,8 +1605,7 @@ def _transform(self, samples: Iterable[Sample]) -> Iterable[Contribution]: ), dates=[self._date(sample)], projects=[self._project(self._api_project)]) - hub_ids = list(map(str, visitor.files)) - yield self._add_replica(contents, sample, hub_ids) + yield self._add_replica(contents, sample) class BundleAsEntity(DatedEntity): @@ -1709,12 +1705,7 @@ def _transform(self) -> Transform: contributed_analyses=contributed_analyses, dates=[self._date(self._singleton_entity())], projects=[self._project(self._api_project)]) - hub_ids = self._hub_ids(visitor) - return self._add_replica(contents, self._singleton_entity(), hub_ids) - - @abstractmethod - def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: - raise NotImplementedError + return self._add_replica(contents, self._singleton_entity()) class ProjectTransformer(SingletonTransformer): @@ -1726,14 +1717,6 @@ def _singleton_entity(self) -> DatedEntity: def entity_type(cls) -> str: return 'projects' - def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: - # Every file in a snapshot is linked to that snapshot's singular - # project, making an explicit list of hub IDs for the project both - # redundant and impractically large. Therefore, we leave the hub IDs - # field empty for projects and rely on the tenet that every file is an - # implicit hub of its parent project. - return [] - class BundleTransformer(SingletonTransformer): @@ -1750,6 +1733,3 @@ def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]: @classmethod def entity_type(cls) -> str: return 'bundles' - - def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: - return list(map(str, visitor.files)) diff --git a/test/indexer/__init__.py b/test/indexer/__init__.py index b7e199c1f..c72d7fabe 100644 --- a/test/indexer/__init__.py +++ b/test/indexer/__init__.py @@ -70,11 +70,8 @@ class ForcedRefreshIndexService(IndexService): - def _create_writer(self, - doc_type: DocumentType, - catalog: Optional[CatalogName] - ) -> IndexWriter: - writer = super()._create_writer(doc_type, catalog) + def _create_writer(self, catalog: Optional[CatalogName]) -> IndexWriter: + writer = super()._create_writer(catalog) # With a single client thread, refresh=True is faster than # refresh="wait_for". The latter would limit the request rate to # 1/refresh_interval. That's only one request per second with diff --git a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json index 352613383..66134fba1 100644 --- a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json +++ b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json @@ -438,9 +438,7 @@ "_source": { "entity_id": "1509ef40-d1ba-440d-b298-16b7c173dcd4", "replica_type": "anvil_sequencingactivity", - "hub_ids": [ - "15b76f9c-6b46-433f-851d-34e89f1b9ba6" - ], + "hub_ids": [], "contents": { "activity_type": "Sequencing", "assay_type": [], @@ -870,9 +868,7 @@ "_source": { "entity_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", "replica_type": "anvil_file", - "hub_ids": [ - "15b76f9c-6b46-433f-851d-34e89f1b9ba6" - ], + "hub_ids": [], "contents": { "data_modality": [], "datarepo_row_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", @@ -902,10 +898,7 @@ "_source": { "entity_id": "15d85d30-ad4a-4f50-87a8-a27f59dd1b5f", "replica_type": "anvil_diagnosis", - "hub_ids": [ - "15b76f9c-6b46-433f-851d-34e89f1b9ba6", - "3b17377b-16b1-431c-9967-e5d01fc5923f" - ], + "hub_ids": [], "contents": { "datarepo_row_id": "15d85d30-ad4a-4f50-87a8-a27f59dd1b5f", "diagnosis_age_lower_bound": null, @@ -1754,9 +1747,7 @@ "crc32": "" }, "replica_type": "anvil_file", - "hub_ids": [ - "3b17377b-16b1-431c-9967-e5d01fc5923f" - ] + "hub_ids": [] } }, { @@ -2198,9 +2189,7 @@ "_source": { "entity_id": "816e364e-1193-4e5b-a91a-14e4b009157c", "replica_type": "anvil_sequencingactivity", - "hub_ids": [ - "3b17377b-16b1-431c-9967-e5d01fc5923f" - ], + "hub_ids": [], "contents": { "activity_type": "Sequencing", "assay_type": [], @@ -2946,10 +2935,7 @@ "_source": { "entity_id": "826dea02-e274-4ffe-aabc-eb3db63ad068", "replica_type": "anvil_biosample", - "hub_ids": [ - "15b76f9c-6b46-433f-851d-34e89f1b9ba6", - "3b17377b-16b1-431c-9967-e5d01fc5923f" - ], + "hub_ids": [], "contents": { "anatomical_site": null, "apriori_cell_type": [], @@ -3525,10 +3511,7 @@ "_source": { "entity_id": "939a4bd3-86ed-4a8a-81f4-fbe0ee673461", "replica_type": "anvil_diagnosis", - "hub_ids": [ - "15b76f9c-6b46-433f-851d-34e89f1b9ba6", - "3b17377b-16b1-431c-9967-e5d01fc5923f" - ], + "hub_ids": [], "contents": { "datarepo_row_id": "939a4bd3-86ed-4a8a-81f4-fbe0ee673461", "diagnosis_age_lower_bound": null, @@ -4116,10 +4099,7 @@ "version": "2022-06-01T00:00:00.000000Z" }, "replica_type": "anvil_donor", - "hub_ids": [ - "15b76f9c-6b46-433f-851d-34e89f1b9ba6", - "3b17377b-16b1-431c-9967-e5d01fc5923f" - ] + "hub_ids": [] } } ] diff --git a/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json b/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json index d8223b730..39c53d767 100644 --- a/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json +++ b/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json @@ -3386,9 +3386,7 @@ }, "entity_id": "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", "replica_type": "file", - "hub_ids": [ - "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb" - ] + "hub_ids": [] }, "_type": "_doc" }, @@ -3424,10 +3422,7 @@ }, "entity_id": "412898c5-5b9b-4907-b07c-e9b89666e204", "replica_type": "cell_suspension", - "hub_ids": [ - "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", - "70d1af4a-82c8-478a-8960-e9028b3616ca" - ] + "hub_ids": [] }, "_type": "_doc" }, @@ -3456,9 +3451,7 @@ }, "entity_id": "70d1af4a-82c8-478a-8960-e9028b3616ca", "replica_type": "file", - "hub_ids": [ - "70d1af4a-82c8-478a-8960-e9028b3616ca" - ] + "hub_ids": [] }, "_type": "_doc" }, @@ -3508,10 +3501,7 @@ }, "entity_id": "a21dc760-a500-4236-bcff-da34a0e873d2", "replica_type": "sample", - "hub_ids": [ - "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", - "70d1af4a-82c8-478a-8960-e9028b3616ca" - ] + "hub_ids": [] }, "_type": "_doc" }, diff --git a/test/indexer/test_indexer.py b/test/indexer/test_indexer.py index 7620e87be..b15972482 100644 --- a/test/indexer/test_indexer.py +++ b/test/indexer/test_indexer.py @@ -61,7 +61,6 @@ EntityReference, EntityType, IndexName, - Replica, ReplicaCoordinates, null_bool, null_int, @@ -132,7 +131,7 @@ def metadata_plugin(self) -> MetadataPlugin: def _num_expected_replicas(self, num_contribs: int, - num_bundles: Optional[int] = None + num_bundles: int = 1 ) -> int: """ :param num_contribs: Number of contributions with distinct contents @@ -141,10 +140,8 @@ def _num_expected_replicas(self, entities :return: How many replicas the indices are expected to contain """ - if num_bundles is None: - num_bundles = 0 if num_contribs == 0 else 1 - # Bundle entities are not replicated. - return num_contribs - num_bundles if config.enable_replicas else 0 + # Bundle entities are not replicated + return max(0, num_contribs - num_bundles) if config.enable_replicas else 0 def _assert_hit_counts(self, hits: list[JSON], @@ -401,8 +398,11 @@ def test_duplicate_notification(self): expected_tallies_2[entity] = 1 self.assertEqual(expected_tallies_2, tallies_2) - # All writes were logged as overwrites, except one. - self.assertEqual(num_contributions - 1, len(logs.output)) + # All writes were logged as overwrites, except one. There are 1 fewer + # replicas than contributions. + num_replicas = num_contributions - 2 if config.enable_replicas else 0 + self.assertEqual(num_contributions - 1 + num_replicas, + len(logs.output)) message_re = re.compile(r'^WARNING:azul\.indexer\.index_service:' r'Document .* exists. ' r'Retrying with overwrite\.$') @@ -473,13 +473,11 @@ def _assert_index_counts(self, *, just_deletion: bool): self.assertEqual(len(actual_addition_contributions), num_expected_addition_contributions) self.assertEqual(len(actual_deletion_contributions), num_expected_deletion_contributions) self._assert_hit_counts(hits, - # Deletion notifications add deletion markers to the contributions index - # instead of removing the existing contributions. num_contribs=num_expected_addition_contributions + num_expected_deletion_contributions, num_aggs=num_expected_aggregates, - # These deletion markers do not affect the number of replicas because we don't - # support deleting replicas. - num_replicas=self._num_expected_replicas(num_expected_addition_contributions)) + # Indexing the same contribution twice (regardless of whether either is a deletion) + # does not create a new replica, so we expect fewer replicas than contributions + num_replicas=self._num_expected_replicas(num_expected_deletion_contributions)) def test_bundle_delete_downgrade(self): """ @@ -2110,30 +2108,6 @@ def test_disallow_manifest_column_joiner(self): with self.assertRaisesRegex(RequirementError, "'||' is disallowed"): self._index_bundle(bundle) - def test_replica_update(self): - contents = {'replica': {}} - coordinates = ReplicaCoordinates(content_hash=json_hash(contents).hexdigest(), - entity=CataloguedEntityReference(catalog=self.catalog, - entity_type='replica', - entity_id='foo')) - replica = Replica(version=None, - replica_type='file', - contents=contents, - hub_ids=[], - coordinates=coordinates) - - for case, hub_ids, expected_hub_ids in [ - ('New replica', ['1', '1'], ['1']), - ('Additional hub IDs', ['3', '2', '1'], ['1', '2', '3']), - ('Redundant hub IDs', ['1', '2'], ['1', '2', '3']) - ]: - with self.subTest(case): - replica.hub_ids[:] = hub_ids - self.index_service.replicate(self.catalog, [replica]) - hit = one(self._get_all_hits()) - self.assertEqual(hit['_id'], coordinates.document_id) - self.assertEqual(hit['_source']['hub_ids'], expected_hub_ids) - class TestIndexManagement(AzulUnitTestCase): From 4f5141a9bd100d89464208e763d6fc283a9f8ff2 Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Sat, 16 Mar 2024 21:52:16 -0700 Subject: [PATCH 2/4] Enable replicas on `prod` (#6062) Not a fix. --- deployments/prod/environment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deployments/prod/environment.py b/deployments/prod/environment.py index 6ac64044a..433192ed6 100644 --- a/deployments/prod/environment.py +++ b/deployments/prod/environment.py @@ -1121,5 +1121,5 @@ def env() -> Mapping[str, Optional[str]]: 'channel_id': 'C04JWDFCPFZ' # #team-boardwalk-prod }), - 'AZUL_ENABLE_REPLICAS': '0', + 'AZUL_ENABLE_REPLICAS': '1', } From 4885aba33ddc6d23bd89aed7b80d4a8178f59aa5 Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Sat, 16 Mar 2024 21:52:42 -0700 Subject: [PATCH 3/4] Revert "Revert "[r] Track replica hub IDs (#5360, PR #5987)"" This reverts commit 118e0560965696700bf3e128c80b0cc702d79393. --- environment.py | 4 ++ src/azul/__init__.py | 4 ++ src/azul/indexer/document.py | 43 +++++++++++---- src/azul/indexer/index_controller.py | 12 ++-- src/azul/indexer/index_service.py | 49 ++++++++--------- src/azul/indexer/transform.py | 8 ++- .../metadata/anvil/indexer/transform.py | 55 +++++++++++++++---- .../plugins/metadata/hca/indexer/transform.py | 32 +++++++++-- test/indexer/__init__.py | 7 ++- ...2-e274-affe-aabc-eb3db63ad068.results.json | 36 +++++++++--- ...d.2018-11-02T11:33:44.698028Z.results.json | 18 ++++-- test/indexer/test_indexer.py | 48 ++++++++++++---- 12 files changed, 231 insertions(+), 85 deletions(-) diff --git a/environment.py b/environment.py index 637c30704..f888a1b8e 100644 --- a/environment.py +++ b/environment.py @@ -187,6 +187,10 @@ def env() -> Mapping[str, Optional[str]]: # Whether to create and populate an index for replica documents. 'AZUL_ENABLE_REPLICAS': '1', + # Maximum number of conflicts to allow before giving when writing + # replica documents. + 'AZUL_REPLICA_CONFLICT_LIMIT': '10', + # The name of the current deployment. This variable controls the name of # all cloud resources and is the main vehicle for isolating cloud # resources between deployments. diff --git a/src/azul/__init__.py b/src/azul/__init__.py index 963b6b3d9..563a5e250 100644 --- a/src/azul/__init__.py +++ b/src/azul/__init__.py @@ -765,6 +765,10 @@ def es_volume_size(self) -> int: def enable_replicas(self) -> bool: return self._boolean(self.environ['AZUL_ENABLE_REPLICAS']) + @property + def replica_conflict_limit(self) -> int: + return int(self.environ['AZUL_REPLICA_CONFLICT_LIMIT']) + # Because this property is relatively expensive to produce and frequently # used we are applying aggressive caching here, knowing very well that # this eliminates the option to reconfigure the running process by diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index ddfb7bb8f..bf06473db 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1100,6 +1100,10 @@ class OpType(Enum): #: Remove the document from the index or fail if it does not exist delete = auto() + #: Modify a document in the index via a scripted update or create it if it + #: does not exist + update = auto() + C = TypeVar('C', bound=DocumentCoordinates) @@ -1326,11 +1330,7 @@ def to_index(self, if op_type is OpType.delete else { '_source' if bulk else 'body': - self.translate_fields(doc=self.to_json(), - field_types=field_types[coordinates.entity.catalog], - forward=True) - if self.needs_translation else - self.to_json() + self._body(field_types[coordinates.entity.catalog]) } ), '_id' if bulk else 'id': self.coordinates.document_id @@ -1361,6 +1361,14 @@ def to_index(self, def op_type(self) -> OpType: raise NotImplementedError + def _body(self, field_types: FieldTypes) -> JSON: + body = self.to_json() + if self.needs_translation: + body = self.translate_fields(doc=body, + field_types=field_types, + forward=True) + return body + class DocumentSource(SourceRef[SimpleSourceSpec, SourceRef]): pass @@ -1537,10 +1545,6 @@ class Replica(Document[ReplicaCoordinates[E]]): hub_ids: list[EntityID] - #: The version_type attribute will change to VersionType.none if writing - #: to Elasticsearch fails with 409 - version_type: VersionType = VersionType.create_only - needs_translation: ClassVar[bool] = False def __attrs_post_init__(self): @@ -1555,11 +1559,28 @@ def field_types(cls, field_types: FieldTypes) -> FieldTypes: def to_json(self) -> JSON: return dict(super().to_json(), replica_type=self.replica_type, - hub_ids=self.hub_ids) + # Ensure that index contents is deterministic for unit tests + hub_ids=sorted(set(self.hub_ids))) @property def op_type(self) -> OpType: - return OpType.create + assert self.version_type is VersionType.none, self.version_type + return OpType.update + + def _body(self, field_types: FieldTypes) -> JSON: + return { + 'script': { + 'source': ''' + Stream stream = Stream.concat(ctx._source.hub_ids.stream(), + params.hub_ids.stream()); + ctx._source.hub_ids = stream.sorted().distinct().collect(Collectors.toList()); + ''', + 'params': { + 'hub_ids': self.hub_ids + } + }, + 'upsert': super()._body(field_types) + } CataloguedContribution = Contribution[CataloguedEntityReference] diff --git a/src/azul/indexer/index_controller.py b/src/azul/indexer/index_controller.py index 7536ce9bd..0c9da8bcf 100644 --- a/src/azul/indexer/index_controller.py +++ b/src/azul/indexer/index_controller.py @@ -185,10 +185,14 @@ def contribute(self, event: Iterable[SQSRecord], *, retry=False): for entity, num_contributions in tallies.items()] if replicas: - log.info('Writing %i replicas to index.', len(replicas)) - num_written, num_present = self.index_service.replicate(catalog, replicas) - log.info('Successfully wrote %i replicas; %i were already present', - num_written, num_present) + if delete: + # FIXME: Replica index does not support deletions + # https://github.com/DataBiosphere/azul/issues/5846 + log.warning('Deletion of replicas is not supported') + else: + log.info('Writing %i replicas to index.', len(replicas)) + num_written = self.index_service.replicate(catalog, replicas) + log.info('Successfully wrote %i replicas', num_written) else: log.info('No replicas to write.') diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index d7553e9ba..3ad90edba 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -227,7 +227,8 @@ def delete(self, catalog: CatalogName, bundle: Bundle) -> None: # number of contributions per bundle. # https://github.com/DataBiosphere/azul/issues/610 tallies.update(self.contribute(catalog, contributions)) - self.replicate(catalog, replicas) + # FIXME: Replica index does not support deletions + # https://github.com/DataBiosphere/azul/issues/5846 self.aggregate(tallies) def deep_transform(self, @@ -417,7 +418,7 @@ def contribute(self, with a count of 0 may exist. This is ok. See description of aggregate(). """ tallies = Counter() - writer = self._create_writer(catalog) + writer = self._create_writer(DocumentType.contribution, catalog) while contributions: writer.write(contributions) retry_contributions = [] @@ -451,7 +452,7 @@ def aggregate(self, tallies: CataloguedTallies): catalogs. """ # Use catalog specified in each tally - writer = self._create_writer(catalog=None) + writer = self._create_writer(DocumentType.aggregate, catalog=None) while True: # Read the aggregates old_aggregates = self._read_aggregates(tallies) @@ -505,33 +506,24 @@ def replicate(self, catalog: CatalogName, replicas: list[Replica] ) -> tuple[int, int]: - writer = self._create_writer(catalog) + writer = self._create_writer(DocumentType.replica, catalog) num_replicas = len(replicas) - num_written, num_present = 0, 0 + num_written = 0 while replicas: writer.write(replicas) retry_replicas = [] for r in replicas: if r.coordinates in writer.retries: - conflicts = writer.conflicts[r.coordinates] - if conflicts == 0: - retry_replicas.append(r) - elif conflicts == 1: - # FIXME: Track replica hub IDs - # https://github.com/DataBiosphere/azul/issues/5360 - writer.conflicts.pop(r.coordinates) - num_present += 1 - else: - assert False, (conflicts, r.coordinates) + retry_replicas.append(r) else: num_written += 1 replicas = retry_replicas writer.raise_on_errors() - assert num_written + num_present == num_replicas, ( - num_written, num_present, num_replicas + assert num_written == num_replicas, ( + num_written, num_replicas ) - return num_written, num_present + return num_written def _read_aggregates(self, entities: CataloguedTallies @@ -790,16 +782,25 @@ def _reconcile(self, for entity_type, entities in result.items() } - def _create_writer(self, catalog: Optional[CatalogName]) -> 'IndexWriter': + def _create_writer(self, + doc_type: DocumentType, + catalog: Optional[CatalogName] + ) -> 'IndexWriter': # We allow one conflict retry in the case of duplicate notifications and # switch from 'add' to 'update'. After that, there should be no - # conflicts because we use an SQS FIFO message group per entity. For - # other errors we use SQS message redelivery to take care of the - # retries. + # conflicts because we use an SQS FIFO message group per entity. + # Conflicts are common when writing replicas due to entities being + # shared between bundles. For other errors we use SQS message redelivery + # to take care of the retries. + limits = { + DocumentType.contribution: 1, + DocumentType.aggregate: 1, + DocumentType.replica: config.replica_conflict_limit + } return IndexWriter(catalog, self.catalogued_field_types(), refresh=False, - conflict_retry_limit=1, + conflict_retry_limit=limits[doc_type], error_retry_limit=0) @@ -855,8 +856,6 @@ def write(self, documents: list[Document]): def _write_individually(self, documents: Iterable[Document]): log.info('Writing documents individually') for doc in documents: - if isinstance(doc, Replica): - assert doc.version_type is VersionType.create_only, doc try: method = getattr(self.es_client, doc.op_type.name) method(refresh=self.refresh, **doc.to_index(self.catalog, self.field_types)) diff --git a/src/azul/indexer/transform.py b/src/azul/indexer/transform.py index 1a9db1c86..cd8ab6fae 100644 --- a/src/azul/indexer/transform.py +++ b/src/azul/indexer/transform.py @@ -125,14 +125,18 @@ def _contribution(self, source=self.bundle.fqid.source, contents=contents) - def _replica(self, contents: MutableJSON, entity: EntityReference) -> Replica: + def _replica(self, + contents: MutableJSON, + entity: EntityReference, + hub_ids: list[EntityID] + ) -> Replica: coordinates = ReplicaCoordinates(content_hash=json_hash(contents).hexdigest(), entity=entity) return Replica(coordinates=coordinates, version=None, replica_type=self.replica_type(entity), contents=contents, - hub_ids=[]) + hub_ids=hub_ids) @classmethod @abstractmethod diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py index 106eadfde..5a62b64b3 100644 --- a/src/azul/plugins/metadata/anvil/indexer/transform.py +++ b/src/azul/plugins/metadata/anvil/indexer/transform.py @@ -186,11 +186,14 @@ def _transform(self, entity: EntityReference) -> Transform: def _add_replica(self, contribution: Optional[MutableJSON], entity: EntityReference, + hub_ids: list[EntityID] ) -> Transform: no_replica = not config.enable_replicas or self.entity_type() == 'bundles' return ( None if contribution is None else self._contribution(contribution, entity), - None if no_replica else self._replica(self.bundle.entities[entity], entity) + None if no_replica else self._replica(self.bundle.entities[entity], + entity, + hub_ids) ) def _pluralize(self, entity_type: str) -> str: @@ -474,8 +477,8 @@ def _complete_dataset_keys(cls) -> AbstractSet[str]: class SingletonTransformer(BaseTransformer, metaclass=ABCMeta): - def _transform(self, entity: EntityReference) -> Transform: - contents = dict( + def _contents(self) -> MutableJSON: + return dict( activities=self._entities(self._activity, chain.from_iterable( self._entities_by_type[activity_type] for activity_type in self._activity_polymorphic_types @@ -486,7 +489,6 @@ def _transform(self, entity: EntityReference) -> Transform: donors=self._entities(self._donor, self._entities_by_type['donor']), files=self._entities(self._file, self._entities_by_type['file']) ) - return self._add_replica(contents, entity) @classmethod def field_types(cls) -> FieldTypes: @@ -527,15 +529,17 @@ def entity_type(cls) -> str: def _transform(self, entity: EntityReference) -> Transform: linked = self._linked_entities(entity) + files = linked['file'] contents = dict( activities=[self._activity(entity)], biosamples=self._entities(self._biosample, linked['biosample']), datasets=[self._dataset(self._only_dataset())], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=self._entities(self._donor, linked['donor']), - files=self._entities(self._file, linked['file']), + files=self._entities(self._file, files), ) - return self._add_replica(contents, entity) + hub_ids = [f.entity_id for f in files] + return self._add_replica(contents, entity, hub_ids) class BiosampleTransformer(BaseTransformer): @@ -546,6 +550,7 @@ def entity_type(cls) -> str: def _transform(self, entity: EntityReference) -> Transform: linked = self._linked_entities(entity) + files = linked['file'] contents = dict( activities=self._entities(self._activity, chain.from_iterable( linked[activity_type] @@ -555,15 +560,18 @@ def _transform(self, entity: EntityReference) -> Transform: datasets=[self._dataset(self._only_dataset())], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=self._entities(self._donor, linked['donor']), - files=self._entities(self._file, linked['file']), + files=self._entities(self._file, files), ) - return self._add_replica(contents, entity) + hub_ids = [f.entity_id for f in files] + return self._add_replica(contents, entity, hub_ids) class DiagnosisTransformer(BaseTransformer): def _transform(self, entity: EntityReference) -> Transform: - return self._add_replica(None, entity) + files = self._linked_entities(entity)['file'] + hub_ids = [f.entity_id for f in files] + return self._add_replica(None, entity, hub_ids) @classmethod def entity_type(cls) -> EntityType: @@ -580,6 +588,11 @@ def _singleton(self) -> EntityReference: return EntityReference(entity_type='bundle', entity_id=self.bundle.uuid) + def _transform(self, entity: EntityReference) -> Transform: + contents = self._contents() + hub_ids = [f.entity_id for f in self._entities_by_type['file']] + return self._add_replica(contents, entity, hub_ids) + class DatasetTransformer(SingletonTransformer): @@ -590,6 +603,18 @@ def entity_type(cls) -> str: def _singleton(self) -> EntityReference: return self._only_dataset() + def _transform(self, entity: EntityReference) -> Transform: + contents = self._contents() + # Every file in a snapshot is linked to that snapshot's singular + # dataset, making an explicit list of hub IDs for the dataset both + # redundant and impractically large (we observe that for large + # snapshots, trying to track this many files in a single data structure + # causes a prohibitively high rate of conflicts during replica updates). + # Therefore, we leave the hub IDs field empty for datasets and rely on + # the tenet that every file is an implicit hub of its parent dataset. + hub_ids = [] + return self._add_replica(contents, entity, hub_ids) + class DonorTransformer(BaseTransformer): @@ -599,6 +624,7 @@ def entity_type(cls) -> str: def _transform(self, entity: EntityReference) -> Transform: linked = self._linked_entities(entity) + files = linked['file'] contents = dict( activities=self._entities(self._activity, chain.from_iterable( linked[activity_type] @@ -608,9 +634,10 @@ def _transform(self, entity: EntityReference) -> Transform: datasets=[self._dataset(self._only_dataset())], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=[self._donor(entity)], - files=self._entities(self._file, linked['file']), + files=self._entities(self._file, files), ) - return self._add_replica(contents, entity) + hub_ids = [f.entity_id for f in files] + return self._add_replica(contents, entity, hub_ids) class FileTransformer(BaseTransformer): @@ -632,4 +659,8 @@ def _transform(self, entity: EntityReference) -> Transform: donors=self._entities(self._donor, linked['donor']), files=[self._file(entity)], ) - return self._add_replica(contents, entity) + # The result of the link traversal does not include the starting entity, + # so without this step the file itself wouldn't be included in its hubs + files = (entity, *linked['file']) + hub_ids = [f.entity_id for f in files] + return self._add_replica(contents, entity, hub_ids) diff --git a/src/azul/plugins/metadata/hca/indexer/transform.py b/src/azul/plugins/metadata/hca/indexer/transform.py index 3dbed2c1d..ab1183453 100644 --- a/src/azul/plugins/metadata/hca/indexer/transform.py +++ b/src/azul/plugins/metadata/hca/indexer/transform.py @@ -503,7 +503,8 @@ def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]: def _add_replica(self, contribution: MutableJSON, - entity: Union[api.Entity, DatedEntity] + entity: Union[api.Entity, DatedEntity], + hub_ids: list[EntityID] ) -> Transform: entity_ref = EntityReference(entity_id=str(entity.document_id), entity_type=self.entity_type()) @@ -511,7 +512,7 @@ def _add_replica(self, replica = None else: assert isinstance(entity, api.Entity), entity - replica = self._replica(entity.json, entity_ref) + replica = self._replica(entity.json, entity_ref, hub_ids) return self._contribution(contribution, entity_ref), replica def _find_ancestor_samples(self, @@ -1465,7 +1466,8 @@ def _transform(self, files: Iterable[api.File]) -> Iterable[Contribution]: additional_contents = self.matrix_stratification_values(file) for entity_type, values in additional_contents.items(): contents[entity_type].extend(values) - yield self._add_replica(contents, file) + hub_ids = list(map(str, visitor.files)) + yield self._add_replica(contents, file, hub_ids) def matrix_stratification_values(self, file: api.File) -> JSON: """ @@ -1560,7 +1562,8 @@ def _transform(self, ), dates=[self._date(cell_suspension)], projects=[self._project(self._api_project)]) - yield self._add_replica(contents, cell_suspension) + hub_ids = list(map(str, visitor.files)) + yield self._add_replica(contents, cell_suspension, hub_ids) class SampleTransformer(PartitionedTransformer): @@ -1605,7 +1608,8 @@ def _transform(self, samples: Iterable[Sample]) -> Iterable[Contribution]: ), dates=[self._date(sample)], projects=[self._project(self._api_project)]) - yield self._add_replica(contents, sample) + hub_ids = list(map(str, visitor.files)) + yield self._add_replica(contents, sample, hub_ids) class BundleAsEntity(DatedEntity): @@ -1705,7 +1709,12 @@ def _transform(self) -> Transform: contributed_analyses=contributed_analyses, dates=[self._date(self._singleton_entity())], projects=[self._project(self._api_project)]) - return self._add_replica(contents, self._singleton_entity()) + hub_ids = self._hub_ids(visitor) + return self._add_replica(contents, self._singleton_entity(), hub_ids) + + @abstractmethod + def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: + raise NotImplementedError class ProjectTransformer(SingletonTransformer): @@ -1717,6 +1726,14 @@ def _singleton_entity(self) -> DatedEntity: def entity_type(cls) -> str: return 'projects' + def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: + # Every file in a snapshot is linked to that snapshot's singular + # project, making an explicit list of hub IDs for the project both + # redundant and impractically large. Therefore, we leave the hub IDs + # field empty for projects and rely on the tenet that every file is an + # implicit hub of its parent project. + return [] + class BundleTransformer(SingletonTransformer): @@ -1733,3 +1750,6 @@ def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]: @classmethod def entity_type(cls) -> str: return 'bundles' + + def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: + return list(map(str, visitor.files)) diff --git a/test/indexer/__init__.py b/test/indexer/__init__.py index c72d7fabe..b7e199c1f 100644 --- a/test/indexer/__init__.py +++ b/test/indexer/__init__.py @@ -70,8 +70,11 @@ class ForcedRefreshIndexService(IndexService): - def _create_writer(self, catalog: Optional[CatalogName]) -> IndexWriter: - writer = super()._create_writer(catalog) + def _create_writer(self, + doc_type: DocumentType, + catalog: Optional[CatalogName] + ) -> IndexWriter: + writer = super()._create_writer(doc_type, catalog) # With a single client thread, refresh=True is faster than # refresh="wait_for". The latter would limit the request rate to # 1/refresh_interval. That's only one request per second with diff --git a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json index 66134fba1..352613383 100644 --- a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json +++ b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json @@ -438,7 +438,9 @@ "_source": { "entity_id": "1509ef40-d1ba-440d-b298-16b7c173dcd4", "replica_type": "anvil_sequencingactivity", - "hub_ids": [], + "hub_ids": [ + "15b76f9c-6b46-433f-851d-34e89f1b9ba6" + ], "contents": { "activity_type": "Sequencing", "assay_type": [], @@ -868,7 +870,9 @@ "_source": { "entity_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", "replica_type": "anvil_file", - "hub_ids": [], + "hub_ids": [ + "15b76f9c-6b46-433f-851d-34e89f1b9ba6" + ], "contents": { "data_modality": [], "datarepo_row_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", @@ -898,7 +902,10 @@ "_source": { "entity_id": "15d85d30-ad4a-4f50-87a8-a27f59dd1b5f", "replica_type": "anvil_diagnosis", - "hub_ids": [], + "hub_ids": [ + "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "3b17377b-16b1-431c-9967-e5d01fc5923f" + ], "contents": { "datarepo_row_id": "15d85d30-ad4a-4f50-87a8-a27f59dd1b5f", "diagnosis_age_lower_bound": null, @@ -1747,7 +1754,9 @@ "crc32": "" }, "replica_type": "anvil_file", - "hub_ids": [] + "hub_ids": [ + "3b17377b-16b1-431c-9967-e5d01fc5923f" + ] } }, { @@ -2189,7 +2198,9 @@ "_source": { "entity_id": "816e364e-1193-4e5b-a91a-14e4b009157c", "replica_type": "anvil_sequencingactivity", - "hub_ids": [], + "hub_ids": [ + "3b17377b-16b1-431c-9967-e5d01fc5923f" + ], "contents": { "activity_type": "Sequencing", "assay_type": [], @@ -2935,7 +2946,10 @@ "_source": { "entity_id": "826dea02-e274-4ffe-aabc-eb3db63ad068", "replica_type": "anvil_biosample", - "hub_ids": [], + "hub_ids": [ + "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "3b17377b-16b1-431c-9967-e5d01fc5923f" + ], "contents": { "anatomical_site": null, "apriori_cell_type": [], @@ -3511,7 +3525,10 @@ "_source": { "entity_id": "939a4bd3-86ed-4a8a-81f4-fbe0ee673461", "replica_type": "anvil_diagnosis", - "hub_ids": [], + "hub_ids": [ + "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "3b17377b-16b1-431c-9967-e5d01fc5923f" + ], "contents": { "datarepo_row_id": "939a4bd3-86ed-4a8a-81f4-fbe0ee673461", "diagnosis_age_lower_bound": null, @@ -4099,7 +4116,10 @@ "version": "2022-06-01T00:00:00.000000Z" }, "replica_type": "anvil_donor", - "hub_ids": [] + "hub_ids": [ + "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "3b17377b-16b1-431c-9967-e5d01fc5923f" + ] } } ] diff --git a/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json b/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json index 39c53d767..d8223b730 100644 --- a/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json +++ b/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json @@ -3386,7 +3386,9 @@ }, "entity_id": "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", "replica_type": "file", - "hub_ids": [] + "hub_ids": [ + "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb" + ] }, "_type": "_doc" }, @@ -3422,7 +3424,10 @@ }, "entity_id": "412898c5-5b9b-4907-b07c-e9b89666e204", "replica_type": "cell_suspension", - "hub_ids": [] + "hub_ids": [ + "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", + "70d1af4a-82c8-478a-8960-e9028b3616ca" + ] }, "_type": "_doc" }, @@ -3451,7 +3456,9 @@ }, "entity_id": "70d1af4a-82c8-478a-8960-e9028b3616ca", "replica_type": "file", - "hub_ids": [] + "hub_ids": [ + "70d1af4a-82c8-478a-8960-e9028b3616ca" + ] }, "_type": "_doc" }, @@ -3501,7 +3508,10 @@ }, "entity_id": "a21dc760-a500-4236-bcff-da34a0e873d2", "replica_type": "sample", - "hub_ids": [] + "hub_ids": [ + "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", + "70d1af4a-82c8-478a-8960-e9028b3616ca" + ] }, "_type": "_doc" }, diff --git a/test/indexer/test_indexer.py b/test/indexer/test_indexer.py index b15972482..7620e87be 100644 --- a/test/indexer/test_indexer.py +++ b/test/indexer/test_indexer.py @@ -61,6 +61,7 @@ EntityReference, EntityType, IndexName, + Replica, ReplicaCoordinates, null_bool, null_int, @@ -131,7 +132,7 @@ def metadata_plugin(self) -> MetadataPlugin: def _num_expected_replicas(self, num_contribs: int, - num_bundles: int = 1 + num_bundles: Optional[int] = None ) -> int: """ :param num_contribs: Number of contributions with distinct contents @@ -140,8 +141,10 @@ def _num_expected_replicas(self, entities :return: How many replicas the indices are expected to contain """ - # Bundle entities are not replicated - return max(0, num_contribs - num_bundles) if config.enable_replicas else 0 + if num_bundles is None: + num_bundles = 0 if num_contribs == 0 else 1 + # Bundle entities are not replicated. + return num_contribs - num_bundles if config.enable_replicas else 0 def _assert_hit_counts(self, hits: list[JSON], @@ -398,11 +401,8 @@ def test_duplicate_notification(self): expected_tallies_2[entity] = 1 self.assertEqual(expected_tallies_2, tallies_2) - # All writes were logged as overwrites, except one. There are 1 fewer - # replicas than contributions. - num_replicas = num_contributions - 2 if config.enable_replicas else 0 - self.assertEqual(num_contributions - 1 + num_replicas, - len(logs.output)) + # All writes were logged as overwrites, except one. + self.assertEqual(num_contributions - 1, len(logs.output)) message_re = re.compile(r'^WARNING:azul\.indexer\.index_service:' r'Document .* exists. ' r'Retrying with overwrite\.$') @@ -473,11 +473,13 @@ def _assert_index_counts(self, *, just_deletion: bool): self.assertEqual(len(actual_addition_contributions), num_expected_addition_contributions) self.assertEqual(len(actual_deletion_contributions), num_expected_deletion_contributions) self._assert_hit_counts(hits, + # Deletion notifications add deletion markers to the contributions index + # instead of removing the existing contributions. num_contribs=num_expected_addition_contributions + num_expected_deletion_contributions, num_aggs=num_expected_aggregates, - # Indexing the same contribution twice (regardless of whether either is a deletion) - # does not create a new replica, so we expect fewer replicas than contributions - num_replicas=self._num_expected_replicas(num_expected_deletion_contributions)) + # These deletion markers do not affect the number of replicas because we don't + # support deleting replicas. + num_replicas=self._num_expected_replicas(num_expected_addition_contributions)) def test_bundle_delete_downgrade(self): """ @@ -2108,6 +2110,30 @@ def test_disallow_manifest_column_joiner(self): with self.assertRaisesRegex(RequirementError, "'||' is disallowed"): self._index_bundle(bundle) + def test_replica_update(self): + contents = {'replica': {}} + coordinates = ReplicaCoordinates(content_hash=json_hash(contents).hexdigest(), + entity=CataloguedEntityReference(catalog=self.catalog, + entity_type='replica', + entity_id='foo')) + replica = Replica(version=None, + replica_type='file', + contents=contents, + hub_ids=[], + coordinates=coordinates) + + for case, hub_ids, expected_hub_ids in [ + ('New replica', ['1', '1'], ['1']), + ('Additional hub IDs', ['3', '2', '1'], ['1', '2', '3']), + ('Redundant hub IDs', ['1', '2'], ['1', '2', '3']) + ]: + with self.subTest(case): + replica.hub_ids[:] = hub_ids + self.index_service.replicate(self.catalog, [replica]) + hit = one(self._get_all_hits()) + self.assertEqual(hit['_id'], coordinates.document_id) + self.assertEqual(hit['_source']['hub_ids'], expected_hub_ids) + class TestIndexManagement(AzulUnitTestCase): From 65be6b3a7ec307e8a81d16cae1d3eae47ed0bce6 Mon Sep 17 00:00:00 2001 From: Hannes Schmidt Date: Thu, 21 Mar 2024 15:58:23 -0700 Subject: [PATCH 4/4] [H] Revert "[r] Index lm5 in LungMAP (#6068, PR #6071)" (#6081) This reverts commit b87af907c4b5862913b6ab99f829b3f1a1c47cbf, reversing changes made to 27c2170738c5c3eed5b8ad97b30902363e166408. --- deployments/prod/environment.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/deployments/prod/environment.py b/deployments/prod/environment.py index e73d5e076..b1adc8297 100644 --- a/deployments/prod/environment.py +++ b/deployments/prod/environment.py @@ -1053,11 +1053,6 @@ def mkdict(previous_catalog: dict[str, str], mksrc('datarepo-3f332829', 'lungmap_prod_f899709cae2c4bb988f0131142e6c7ec__20220310_20231207_lm4', 1), ])) -lm5_sources = mkdict(lm4_sources, 9, mkdelta([ - mksrc('datarepo-ac5f1124', 'lungmap_prod_1977dc4784144263a8706b0f207d8ab3__20240206_20240222_lm5', 1), - mksrc('datarepo-aa408857', 'lungmap_prod_fdadee7e209745d5bf81cc280bd8348e__20240206_20240222_lm5', 1), -])) - def env() -> Mapping[str, Optional[str]]: """ @@ -1102,8 +1097,7 @@ def env() -> Mapping[str, Optional[str]]: ('hca', 'dcp35', dcp35_sources), ('hca', 'dcp36', dcp36_sources), ('hca', 'pilot', pilot_sources), - ('lungmap', 'lm4', lm4_sources), - ('lungmap', 'lm5', lm5_sources) + ('lungmap', 'lm4', lm4_sources) ] for suffix, internal in [ ('', False), ('-it', True)