diff --git a/environment.py b/environment.py index 2593ad371..6cbc89490 100644 --- a/environment.py +++ b/environment.py @@ -187,6 +187,10 @@ def env() -> Mapping[str, Optional[str]]: # Whether to create and populate an index for replica documents. 'AZUL_ENABLE_REPLICAS': '1', + # Maximum number of conflicts to allow before giving when writing + # replica documents. + 'AZUL_REPLICA_CONFLICT_LIMIT': '10', + # The name of the current deployment. This variable controls the name of # all cloud resources and is the main vehicle for isolating cloud # resources between deployments. diff --git a/src/azul/__init__.py b/src/azul/__init__.py index 4225f03fd..785a1bb77 100644 --- a/src/azul/__init__.py +++ b/src/azul/__init__.py @@ -765,6 +765,10 @@ def es_volume_size(self) -> int: def enable_replicas(self) -> bool: return self._boolean(self.environ['AZUL_ENABLE_REPLICAS']) + @property + def replica_conflict_limit(self) -> int: + return int(self.environ['AZUL_REPLICA_CONFLICT_LIMIT']) + # Because this property is relatively expensive to produce and frequently # used we are applying aggressive caching here, knowing very well that # this eliminates the option to reconfigure the running process by diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index ddfb7bb8f..bf06473db 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -1100,6 +1100,10 @@ class OpType(Enum): #: Remove the document from the index or fail if it does not exist delete = auto() + #: Modify a document in the index via a scripted update or create it if it + #: does not exist + update = auto() + C = TypeVar('C', bound=DocumentCoordinates) @@ -1326,11 +1330,7 @@ def to_index(self, if op_type is OpType.delete else { '_source' if bulk else 'body': - self.translate_fields(doc=self.to_json(), - field_types=field_types[coordinates.entity.catalog], - forward=True) - if self.needs_translation else - self.to_json() + self._body(field_types[coordinates.entity.catalog]) } ), '_id' if bulk else 'id': self.coordinates.document_id @@ -1361,6 +1361,14 @@ def to_index(self, def op_type(self) -> OpType: raise NotImplementedError + def _body(self, field_types: FieldTypes) -> JSON: + body = self.to_json() + if self.needs_translation: + body = self.translate_fields(doc=body, + field_types=field_types, + forward=True) + return body + class DocumentSource(SourceRef[SimpleSourceSpec, SourceRef]): pass @@ -1537,10 +1545,6 @@ class Replica(Document[ReplicaCoordinates[E]]): hub_ids: list[EntityID] - #: The version_type attribute will change to VersionType.none if writing - #: to Elasticsearch fails with 409 - version_type: VersionType = VersionType.create_only - needs_translation: ClassVar[bool] = False def __attrs_post_init__(self): @@ -1555,11 +1559,28 @@ def field_types(cls, field_types: FieldTypes) -> FieldTypes: def to_json(self) -> JSON: return dict(super().to_json(), replica_type=self.replica_type, - hub_ids=self.hub_ids) + # Ensure that index contents is deterministic for unit tests + hub_ids=sorted(set(self.hub_ids))) @property def op_type(self) -> OpType: - return OpType.create + assert self.version_type is VersionType.none, self.version_type + return OpType.update + + def _body(self, field_types: FieldTypes) -> JSON: + return { + 'script': { + 'source': ''' + Stream stream = Stream.concat(ctx._source.hub_ids.stream(), + params.hub_ids.stream()); + ctx._source.hub_ids = stream.sorted().distinct().collect(Collectors.toList()); + ''', + 'params': { + 'hub_ids': self.hub_ids + } + }, + 'upsert': super()._body(field_types) + } CataloguedContribution = Contribution[CataloguedEntityReference] diff --git a/src/azul/indexer/index_controller.py b/src/azul/indexer/index_controller.py index 7536ce9bd..0c9da8bcf 100644 --- a/src/azul/indexer/index_controller.py +++ b/src/azul/indexer/index_controller.py @@ -185,10 +185,14 @@ def contribute(self, event: Iterable[SQSRecord], *, retry=False): for entity, num_contributions in tallies.items()] if replicas: - log.info('Writing %i replicas to index.', len(replicas)) - num_written, num_present = self.index_service.replicate(catalog, replicas) - log.info('Successfully wrote %i replicas; %i were already present', - num_written, num_present) + if delete: + # FIXME: Replica index does not support deletions + # https://github.com/DataBiosphere/azul/issues/5846 + log.warning('Deletion of replicas is not supported') + else: + log.info('Writing %i replicas to index.', len(replicas)) + num_written = self.index_service.replicate(catalog, replicas) + log.info('Successfully wrote %i replicas', num_written) else: log.info('No replicas to write.') diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index d7553e9ba..3ad90edba 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -227,7 +227,8 @@ def delete(self, catalog: CatalogName, bundle: Bundle) -> None: # number of contributions per bundle. # https://github.com/DataBiosphere/azul/issues/610 tallies.update(self.contribute(catalog, contributions)) - self.replicate(catalog, replicas) + # FIXME: Replica index does not support deletions + # https://github.com/DataBiosphere/azul/issues/5846 self.aggregate(tallies) def deep_transform(self, @@ -417,7 +418,7 @@ def contribute(self, with a count of 0 may exist. This is ok. See description of aggregate(). """ tallies = Counter() - writer = self._create_writer(catalog) + writer = self._create_writer(DocumentType.contribution, catalog) while contributions: writer.write(contributions) retry_contributions = [] @@ -451,7 +452,7 @@ def aggregate(self, tallies: CataloguedTallies): catalogs. """ # Use catalog specified in each tally - writer = self._create_writer(catalog=None) + writer = self._create_writer(DocumentType.aggregate, catalog=None) while True: # Read the aggregates old_aggregates = self._read_aggregates(tallies) @@ -505,33 +506,24 @@ def replicate(self, catalog: CatalogName, replicas: list[Replica] ) -> tuple[int, int]: - writer = self._create_writer(catalog) + writer = self._create_writer(DocumentType.replica, catalog) num_replicas = len(replicas) - num_written, num_present = 0, 0 + num_written = 0 while replicas: writer.write(replicas) retry_replicas = [] for r in replicas: if r.coordinates in writer.retries: - conflicts = writer.conflicts[r.coordinates] - if conflicts == 0: - retry_replicas.append(r) - elif conflicts == 1: - # FIXME: Track replica hub IDs - # https://github.com/DataBiosphere/azul/issues/5360 - writer.conflicts.pop(r.coordinates) - num_present += 1 - else: - assert False, (conflicts, r.coordinates) + retry_replicas.append(r) else: num_written += 1 replicas = retry_replicas writer.raise_on_errors() - assert num_written + num_present == num_replicas, ( - num_written, num_present, num_replicas + assert num_written == num_replicas, ( + num_written, num_replicas ) - return num_written, num_present + return num_written def _read_aggregates(self, entities: CataloguedTallies @@ -790,16 +782,25 @@ def _reconcile(self, for entity_type, entities in result.items() } - def _create_writer(self, catalog: Optional[CatalogName]) -> 'IndexWriter': + def _create_writer(self, + doc_type: DocumentType, + catalog: Optional[CatalogName] + ) -> 'IndexWriter': # We allow one conflict retry in the case of duplicate notifications and # switch from 'add' to 'update'. After that, there should be no - # conflicts because we use an SQS FIFO message group per entity. For - # other errors we use SQS message redelivery to take care of the - # retries. + # conflicts because we use an SQS FIFO message group per entity. + # Conflicts are common when writing replicas due to entities being + # shared between bundles. For other errors we use SQS message redelivery + # to take care of the retries. + limits = { + DocumentType.contribution: 1, + DocumentType.aggregate: 1, + DocumentType.replica: config.replica_conflict_limit + } return IndexWriter(catalog, self.catalogued_field_types(), refresh=False, - conflict_retry_limit=1, + conflict_retry_limit=limits[doc_type], error_retry_limit=0) @@ -855,8 +856,6 @@ def write(self, documents: list[Document]): def _write_individually(self, documents: Iterable[Document]): log.info('Writing documents individually') for doc in documents: - if isinstance(doc, Replica): - assert doc.version_type is VersionType.create_only, doc try: method = getattr(self.es_client, doc.op_type.name) method(refresh=self.refresh, **doc.to_index(self.catalog, self.field_types)) diff --git a/src/azul/indexer/transform.py b/src/azul/indexer/transform.py index 1a9db1c86..cd8ab6fae 100644 --- a/src/azul/indexer/transform.py +++ b/src/azul/indexer/transform.py @@ -125,14 +125,18 @@ def _contribution(self, source=self.bundle.fqid.source, contents=contents) - def _replica(self, contents: MutableJSON, entity: EntityReference) -> Replica: + def _replica(self, + contents: MutableJSON, + entity: EntityReference, + hub_ids: list[EntityID] + ) -> Replica: coordinates = ReplicaCoordinates(content_hash=json_hash(contents).hexdigest(), entity=entity) return Replica(coordinates=coordinates, version=None, replica_type=self.replica_type(entity), contents=contents, - hub_ids=[]) + hub_ids=hub_ids) @classmethod @abstractmethod diff --git a/src/azul/plugins/metadata/anvil/indexer/transform.py b/src/azul/plugins/metadata/anvil/indexer/transform.py index 106eadfde..5a62b64b3 100644 --- a/src/azul/plugins/metadata/anvil/indexer/transform.py +++ b/src/azul/plugins/metadata/anvil/indexer/transform.py @@ -186,11 +186,14 @@ def _transform(self, entity: EntityReference) -> Transform: def _add_replica(self, contribution: Optional[MutableJSON], entity: EntityReference, + hub_ids: list[EntityID] ) -> Transform: no_replica = not config.enable_replicas or self.entity_type() == 'bundles' return ( None if contribution is None else self._contribution(contribution, entity), - None if no_replica else self._replica(self.bundle.entities[entity], entity) + None if no_replica else self._replica(self.bundle.entities[entity], + entity, + hub_ids) ) def _pluralize(self, entity_type: str) -> str: @@ -474,8 +477,8 @@ def _complete_dataset_keys(cls) -> AbstractSet[str]: class SingletonTransformer(BaseTransformer, metaclass=ABCMeta): - def _transform(self, entity: EntityReference) -> Transform: - contents = dict( + def _contents(self) -> MutableJSON: + return dict( activities=self._entities(self._activity, chain.from_iterable( self._entities_by_type[activity_type] for activity_type in self._activity_polymorphic_types @@ -486,7 +489,6 @@ def _transform(self, entity: EntityReference) -> Transform: donors=self._entities(self._donor, self._entities_by_type['donor']), files=self._entities(self._file, self._entities_by_type['file']) ) - return self._add_replica(contents, entity) @classmethod def field_types(cls) -> FieldTypes: @@ -527,15 +529,17 @@ def entity_type(cls) -> str: def _transform(self, entity: EntityReference) -> Transform: linked = self._linked_entities(entity) + files = linked['file'] contents = dict( activities=[self._activity(entity)], biosamples=self._entities(self._biosample, linked['biosample']), datasets=[self._dataset(self._only_dataset())], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=self._entities(self._donor, linked['donor']), - files=self._entities(self._file, linked['file']), + files=self._entities(self._file, files), ) - return self._add_replica(contents, entity) + hub_ids = [f.entity_id for f in files] + return self._add_replica(contents, entity, hub_ids) class BiosampleTransformer(BaseTransformer): @@ -546,6 +550,7 @@ def entity_type(cls) -> str: def _transform(self, entity: EntityReference) -> Transform: linked = self._linked_entities(entity) + files = linked['file'] contents = dict( activities=self._entities(self._activity, chain.from_iterable( linked[activity_type] @@ -555,15 +560,18 @@ def _transform(self, entity: EntityReference) -> Transform: datasets=[self._dataset(self._only_dataset())], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=self._entities(self._donor, linked['donor']), - files=self._entities(self._file, linked['file']), + files=self._entities(self._file, files), ) - return self._add_replica(contents, entity) + hub_ids = [f.entity_id for f in files] + return self._add_replica(contents, entity, hub_ids) class DiagnosisTransformer(BaseTransformer): def _transform(self, entity: EntityReference) -> Transform: - return self._add_replica(None, entity) + files = self._linked_entities(entity)['file'] + hub_ids = [f.entity_id for f in files] + return self._add_replica(None, entity, hub_ids) @classmethod def entity_type(cls) -> EntityType: @@ -580,6 +588,11 @@ def _singleton(self) -> EntityReference: return EntityReference(entity_type='bundle', entity_id=self.bundle.uuid) + def _transform(self, entity: EntityReference) -> Transform: + contents = self._contents() + hub_ids = [f.entity_id for f in self._entities_by_type['file']] + return self._add_replica(contents, entity, hub_ids) + class DatasetTransformer(SingletonTransformer): @@ -590,6 +603,18 @@ def entity_type(cls) -> str: def _singleton(self) -> EntityReference: return self._only_dataset() + def _transform(self, entity: EntityReference) -> Transform: + contents = self._contents() + # Every file in a snapshot is linked to that snapshot's singular + # dataset, making an explicit list of hub IDs for the dataset both + # redundant and impractically large (we observe that for large + # snapshots, trying to track this many files in a single data structure + # causes a prohibitively high rate of conflicts during replica updates). + # Therefore, we leave the hub IDs field empty for datasets and rely on + # the tenet that every file is an implicit hub of its parent dataset. + hub_ids = [] + return self._add_replica(contents, entity, hub_ids) + class DonorTransformer(BaseTransformer): @@ -599,6 +624,7 @@ def entity_type(cls) -> str: def _transform(self, entity: EntityReference) -> Transform: linked = self._linked_entities(entity) + files = linked['file'] contents = dict( activities=self._entities(self._activity, chain.from_iterable( linked[activity_type] @@ -608,9 +634,10 @@ def _transform(self, entity: EntityReference) -> Transform: datasets=[self._dataset(self._only_dataset())], diagnoses=self._entities(self._diagnosis, linked['diagnosis']), donors=[self._donor(entity)], - files=self._entities(self._file, linked['file']), + files=self._entities(self._file, files), ) - return self._add_replica(contents, entity) + hub_ids = [f.entity_id for f in files] + return self._add_replica(contents, entity, hub_ids) class FileTransformer(BaseTransformer): @@ -632,4 +659,8 @@ def _transform(self, entity: EntityReference) -> Transform: donors=self._entities(self._donor, linked['donor']), files=[self._file(entity)], ) - return self._add_replica(contents, entity) + # The result of the link traversal does not include the starting entity, + # so without this step the file itself wouldn't be included in its hubs + files = (entity, *linked['file']) + hub_ids = [f.entity_id for f in files] + return self._add_replica(contents, entity, hub_ids) diff --git a/src/azul/plugins/metadata/hca/indexer/transform.py b/src/azul/plugins/metadata/hca/indexer/transform.py index 3dbed2c1d..ab1183453 100644 --- a/src/azul/plugins/metadata/hca/indexer/transform.py +++ b/src/azul/plugins/metadata/hca/indexer/transform.py @@ -503,7 +503,8 @@ def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]: def _add_replica(self, contribution: MutableJSON, - entity: Union[api.Entity, DatedEntity] + entity: Union[api.Entity, DatedEntity], + hub_ids: list[EntityID] ) -> Transform: entity_ref = EntityReference(entity_id=str(entity.document_id), entity_type=self.entity_type()) @@ -511,7 +512,7 @@ def _add_replica(self, replica = None else: assert isinstance(entity, api.Entity), entity - replica = self._replica(entity.json, entity_ref) + replica = self._replica(entity.json, entity_ref, hub_ids) return self._contribution(contribution, entity_ref), replica def _find_ancestor_samples(self, @@ -1465,7 +1466,8 @@ def _transform(self, files: Iterable[api.File]) -> Iterable[Contribution]: additional_contents = self.matrix_stratification_values(file) for entity_type, values in additional_contents.items(): contents[entity_type].extend(values) - yield self._add_replica(contents, file) + hub_ids = list(map(str, visitor.files)) + yield self._add_replica(contents, file, hub_ids) def matrix_stratification_values(self, file: api.File) -> JSON: """ @@ -1560,7 +1562,8 @@ def _transform(self, ), dates=[self._date(cell_suspension)], projects=[self._project(self._api_project)]) - yield self._add_replica(contents, cell_suspension) + hub_ids = list(map(str, visitor.files)) + yield self._add_replica(contents, cell_suspension, hub_ids) class SampleTransformer(PartitionedTransformer): @@ -1605,7 +1608,8 @@ def _transform(self, samples: Iterable[Sample]) -> Iterable[Contribution]: ), dates=[self._date(sample)], projects=[self._project(self._api_project)]) - yield self._add_replica(contents, sample) + hub_ids = list(map(str, visitor.files)) + yield self._add_replica(contents, sample, hub_ids) class BundleAsEntity(DatedEntity): @@ -1705,7 +1709,12 @@ def _transform(self) -> Transform: contributed_analyses=contributed_analyses, dates=[self._date(self._singleton_entity())], projects=[self._project(self._api_project)]) - return self._add_replica(contents, self._singleton_entity()) + hub_ids = self._hub_ids(visitor) + return self._add_replica(contents, self._singleton_entity(), hub_ids) + + @abstractmethod + def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: + raise NotImplementedError class ProjectTransformer(SingletonTransformer): @@ -1717,6 +1726,14 @@ def _singleton_entity(self) -> DatedEntity: def entity_type(cls) -> str: return 'projects' + def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: + # Every file in a snapshot is linked to that snapshot's singular + # project, making an explicit list of hub IDs for the project both + # redundant and impractically large. Therefore, we leave the hub IDs + # field empty for projects and rely on the tenet that every file is an + # implicit hub of its parent project. + return [] + class BundleTransformer(SingletonTransformer): @@ -1733,3 +1750,6 @@ def aggregator(cls, entity_type: EntityType) -> Optional[EntityAggregator]: @classmethod def entity_type(cls) -> str: return 'bundles' + + def _hub_ids(self, visitor: TransformerVisitor) -> list[str]: + return list(map(str, visitor.files)) diff --git a/test/indexer/__init__.py b/test/indexer/__init__.py index c72d7fabe..b7e199c1f 100644 --- a/test/indexer/__init__.py +++ b/test/indexer/__init__.py @@ -70,8 +70,11 @@ class ForcedRefreshIndexService(IndexService): - def _create_writer(self, catalog: Optional[CatalogName]) -> IndexWriter: - writer = super()._create_writer(catalog) + def _create_writer(self, + doc_type: DocumentType, + catalog: Optional[CatalogName] + ) -> IndexWriter: + writer = super()._create_writer(doc_type, catalog) # With a single client thread, refresh=True is faster than # refresh="wait_for". The latter would limit the request rate to # 1/refresh_interval. That's only one request per second with diff --git a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json index 66134fba1..352613383 100644 --- a/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json +++ b/test/indexer/data/826dea02-e274-affe-aabc-eb3db63ad068.results.json @@ -438,7 +438,9 @@ "_source": { "entity_id": "1509ef40-d1ba-440d-b298-16b7c173dcd4", "replica_type": "anvil_sequencingactivity", - "hub_ids": [], + "hub_ids": [ + "15b76f9c-6b46-433f-851d-34e89f1b9ba6" + ], "contents": { "activity_type": "Sequencing", "assay_type": [], @@ -868,7 +870,9 @@ "_source": { "entity_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", "replica_type": "anvil_file", - "hub_ids": [], + "hub_ids": [ + "15b76f9c-6b46-433f-851d-34e89f1b9ba6" + ], "contents": { "data_modality": [], "datarepo_row_id": "15b76f9c-6b46-433f-851d-34e89f1b9ba6", @@ -898,7 +902,10 @@ "_source": { "entity_id": "15d85d30-ad4a-4f50-87a8-a27f59dd1b5f", "replica_type": "anvil_diagnosis", - "hub_ids": [], + "hub_ids": [ + "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "3b17377b-16b1-431c-9967-e5d01fc5923f" + ], "contents": { "datarepo_row_id": "15d85d30-ad4a-4f50-87a8-a27f59dd1b5f", "diagnosis_age_lower_bound": null, @@ -1747,7 +1754,9 @@ "crc32": "" }, "replica_type": "anvil_file", - "hub_ids": [] + "hub_ids": [ + "3b17377b-16b1-431c-9967-e5d01fc5923f" + ] } }, { @@ -2189,7 +2198,9 @@ "_source": { "entity_id": "816e364e-1193-4e5b-a91a-14e4b009157c", "replica_type": "anvil_sequencingactivity", - "hub_ids": [], + "hub_ids": [ + "3b17377b-16b1-431c-9967-e5d01fc5923f" + ], "contents": { "activity_type": "Sequencing", "assay_type": [], @@ -2935,7 +2946,10 @@ "_source": { "entity_id": "826dea02-e274-4ffe-aabc-eb3db63ad068", "replica_type": "anvil_biosample", - "hub_ids": [], + "hub_ids": [ + "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "3b17377b-16b1-431c-9967-e5d01fc5923f" + ], "contents": { "anatomical_site": null, "apriori_cell_type": [], @@ -3511,7 +3525,10 @@ "_source": { "entity_id": "939a4bd3-86ed-4a8a-81f4-fbe0ee673461", "replica_type": "anvil_diagnosis", - "hub_ids": [], + "hub_ids": [ + "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "3b17377b-16b1-431c-9967-e5d01fc5923f" + ], "contents": { "datarepo_row_id": "939a4bd3-86ed-4a8a-81f4-fbe0ee673461", "diagnosis_age_lower_bound": null, @@ -4099,7 +4116,10 @@ "version": "2022-06-01T00:00:00.000000Z" }, "replica_type": "anvil_donor", - "hub_ids": [] + "hub_ids": [ + "15b76f9c-6b46-433f-851d-34e89f1b9ba6", + "3b17377b-16b1-431c-9967-e5d01fc5923f" + ] } } ] diff --git a/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json b/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json index 39c53d767..d8223b730 100644 --- a/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json +++ b/test/indexer/data/aaa96233-bf27-44c7-82df-b4dc15ad4d9d.2018-11-02T11:33:44.698028Z.results.json @@ -3386,7 +3386,9 @@ }, "entity_id": "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", "replica_type": "file", - "hub_ids": [] + "hub_ids": [ + "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb" + ] }, "_type": "_doc" }, @@ -3422,7 +3424,10 @@ }, "entity_id": "412898c5-5b9b-4907-b07c-e9b89666e204", "replica_type": "cell_suspension", - "hub_ids": [] + "hub_ids": [ + "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", + "70d1af4a-82c8-478a-8960-e9028b3616ca" + ] }, "_type": "_doc" }, @@ -3451,7 +3456,9 @@ }, "entity_id": "70d1af4a-82c8-478a-8960-e9028b3616ca", "replica_type": "file", - "hub_ids": [] + "hub_ids": [ + "70d1af4a-82c8-478a-8960-e9028b3616ca" + ] }, "_type": "_doc" }, @@ -3501,7 +3508,10 @@ }, "entity_id": "a21dc760-a500-4236-bcff-da34a0e873d2", "replica_type": "sample", - "hub_ids": [] + "hub_ids": [ + "0c5ac7c0-817e-40d4-b1b1-34c3d5cfecdb", + "70d1af4a-82c8-478a-8960-e9028b3616ca" + ] }, "_type": "_doc" }, diff --git a/test/indexer/test_indexer.py b/test/indexer/test_indexer.py index b15972482..7620e87be 100644 --- a/test/indexer/test_indexer.py +++ b/test/indexer/test_indexer.py @@ -61,6 +61,7 @@ EntityReference, EntityType, IndexName, + Replica, ReplicaCoordinates, null_bool, null_int, @@ -131,7 +132,7 @@ def metadata_plugin(self) -> MetadataPlugin: def _num_expected_replicas(self, num_contribs: int, - num_bundles: int = 1 + num_bundles: Optional[int] = None ) -> int: """ :param num_contribs: Number of contributions with distinct contents @@ -140,8 +141,10 @@ def _num_expected_replicas(self, entities :return: How many replicas the indices are expected to contain """ - # Bundle entities are not replicated - return max(0, num_contribs - num_bundles) if config.enable_replicas else 0 + if num_bundles is None: + num_bundles = 0 if num_contribs == 0 else 1 + # Bundle entities are not replicated. + return num_contribs - num_bundles if config.enable_replicas else 0 def _assert_hit_counts(self, hits: list[JSON], @@ -398,11 +401,8 @@ def test_duplicate_notification(self): expected_tallies_2[entity] = 1 self.assertEqual(expected_tallies_2, tallies_2) - # All writes were logged as overwrites, except one. There are 1 fewer - # replicas than contributions. - num_replicas = num_contributions - 2 if config.enable_replicas else 0 - self.assertEqual(num_contributions - 1 + num_replicas, - len(logs.output)) + # All writes were logged as overwrites, except one. + self.assertEqual(num_contributions - 1, len(logs.output)) message_re = re.compile(r'^WARNING:azul\.indexer\.index_service:' r'Document .* exists. ' r'Retrying with overwrite\.$') @@ -473,11 +473,13 @@ def _assert_index_counts(self, *, just_deletion: bool): self.assertEqual(len(actual_addition_contributions), num_expected_addition_contributions) self.assertEqual(len(actual_deletion_contributions), num_expected_deletion_contributions) self._assert_hit_counts(hits, + # Deletion notifications add deletion markers to the contributions index + # instead of removing the existing contributions. num_contribs=num_expected_addition_contributions + num_expected_deletion_contributions, num_aggs=num_expected_aggregates, - # Indexing the same contribution twice (regardless of whether either is a deletion) - # does not create a new replica, so we expect fewer replicas than contributions - num_replicas=self._num_expected_replicas(num_expected_deletion_contributions)) + # These deletion markers do not affect the number of replicas because we don't + # support deleting replicas. + num_replicas=self._num_expected_replicas(num_expected_addition_contributions)) def test_bundle_delete_downgrade(self): """ @@ -2108,6 +2110,30 @@ def test_disallow_manifest_column_joiner(self): with self.assertRaisesRegex(RequirementError, "'||' is disallowed"): self._index_bundle(bundle) + def test_replica_update(self): + contents = {'replica': {}} + coordinates = ReplicaCoordinates(content_hash=json_hash(contents).hexdigest(), + entity=CataloguedEntityReference(catalog=self.catalog, + entity_type='replica', + entity_id='foo')) + replica = Replica(version=None, + replica_type='file', + contents=contents, + hub_ids=[], + coordinates=coordinates) + + for case, hub_ids, expected_hub_ids in [ + ('New replica', ['1', '1'], ['1']), + ('Additional hub IDs', ['3', '2', '1'], ['1', '2', '3']), + ('Redundant hub IDs', ['1', '2'], ['1', '2', '3']) + ]: + with self.subTest(case): + replica.hub_ids[:] = hub_ids + self.index_service.replicate(self.catalog, [replica]) + hit = one(self._get_all_hits()) + self.assertEqual(hit['_id'], coordinates.document_id) + self.assertEqual(hit['_source']['hub_ids'], expected_hub_ids) + class TestIndexManagement(AzulUnitTestCase):