diff --git a/src/azul/__init__.py b/src/azul/__init__.py index c839962cb..f321e014b 100644 --- a/src/azul/__init__.py +++ b/src/azul/__init__.py @@ -1,3 +1,6 @@ +from abc import ( + ABCMeta, +) from collections import ( ChainMap, ) @@ -81,6 +84,33 @@ def cache_per_thread(f, /): mutable_furl = furl +class IndexContentsType(metaclass=ABCMeta): + entity_type: str + + def __str__(self): + return self.entity_type + + +class Transformations(IndexContentsType): + + def __init__(self, /, entity_type: str): + Config.validate_entity_type(entity_type) + self.entity_type = entity_type + + def __repr__(self): + return f'{type(self).__name__}({self.entity_type!r})' + + +class Contributions(Transformations): + pass + + +class Aggregates(Transformations): + + def __str__(self): + return f'{self.entity_type}_aggregate' + + class Config: """ See `environment` for documentation of these settings. @@ -931,15 +961,13 @@ def integration_test_catalogs(self) -> Mapping[CatalogName, Catalog]: def es_index_name(self, catalog: CatalogName, - entity_type: str, - aggregate: bool + contents_type: IndexContentsType ) -> str: return str(IndexName(prefix=self._index_prefix, version=2, deployment=self.deployment_stage, catalog=catalog, - entity_type=entity_type, - aggregate=aggregate)) + contents_type=contents_type)) def parse_es_index_name(self, index_name: str) -> 'IndexName': """ @@ -1487,56 +1515,60 @@ class IndexName: #: The catalog the index belongs to or None for v1 indices. catalog: Optional[CatalogName] = attr.ib(default=None) - #: The type of entities this index contains metadata about - entity_type: str - - #: Whether the documents in the index are contributions or aggregates - aggregate: bool = False + #: The type of entities this index contains metadata about, and whether they + #: contributions, aggregates, or replicas. + contents_type: IndexContentsType index_name_version_re: ClassVar[re.Pattern] = re.compile(r'v(\d+)') + @property + def entity_type(self) -> str: + return self.contents_type.entity_type + def __attrs_post_init__(self): """ - >>> IndexName(prefix='azul', version=1, deployment='dev', entity_type='foo_bar') - IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=False) + >>> contents_type = Contributions('foo_bar') + >>> IndexName(prefix='azul', version=1, deployment='dev', contents_type=contents_type) + IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=Contributions('foo_bar')) - >>> IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar') - IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=False) + >>> IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=contents_type) + IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=Contributions('foo_bar')) - >>> IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar') - IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=False) + >>> IndexName(prefix='azul', version=2, deployment='dev', catalog='main', contents_type=contents_type) + IndexName(prefix='azul', version=2, deployment='dev', catalog='main', contents_type=Contributions('foo_bar')) - >>> IndexName(prefix='azul', version=1, deployment='dev', catalog='hca', entity_type='foo') + >>> IndexName(prefix='azul', version=1, deployment='dev', catalog='hca', contents_type=contents_type) Traceback (most recent call last): ... azul.RequirementError: Version 1 prohibits a catalog name ('hca'). - >>> IndexName(prefix='azul', version=2, deployment='dev', entity_type='foo') + >>> IndexName(prefix='azul', version=2, deployment='dev', contents_type=contents_type) Traceback (most recent call last): ... azul.RequirementError: Version 2 requires a catalog name (None). - >>> IndexName(prefix='azul', version=2, deployment='dev', catalog=None, entity_type='foo') + >>> IndexName(prefix='azul', version=2, deployment='dev', catalog=None, contents_type=contents_type) Traceback (most recent call last): ... azul.RequirementError: Version 2 requires a catalog name (None). - >>> IndexName(prefix='_', version=2, deployment='dev', catalog='foo', entity_type='bar') + >>> contents_type = Contributions('bar') + >>> IndexName(prefix='_', version=2, deployment='dev', catalog='foo', contents_type=contents_type) Traceback (most recent call last): ... azul.RequirementError: Prefix '_' is to short, too long or contains invalid characters. - >>> IndexName(prefix='azul', version=2, deployment='_', catalog='foo', entity_type='bar') + >>> IndexName(prefix='azul', version=2, deployment='_', catalog='foo', contents_type=contents_type) Traceback (most recent call last): ... azul.RequirementError: Deployment name '_' is to short, too long or contains invalid characters. - >>> IndexName(prefix='azul', version=2, deployment='dev', catalog='_', entity_type='bar') + >>> IndexName(prefix='azul', version=2, deployment='dev', catalog='_', contents_type=contents_type) Traceback (most recent call last): ... azul.RequirementError: ('Catalog name is invalid', '_') - >>> IndexName(prefix='azul', version=2, deployment='dev', catalog='foo', entity_type='_') + >>> IndexName(prefix='azul', version=2, deployment='dev', catalog='foo', contents_type=Contributions('_')) Traceback (most recent call last): ... azul.RequirementError: entity_type is either too short, too long or contains invalid characters: '_' @@ -1562,19 +1594,19 @@ def parse(cls, index_name, expected_prefix=prefix) -> 'IndexName': Parse the name of an index from any deployment and any version of Azul. >>> IndexName.parse('azul_foo_dev') - IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo', aggregate=False) + IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=Contributions('foo')) >>> IndexName.parse('azul_foo_aggregate_dev') - IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo', aggregate=True) + IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=Aggregates('foo')) >>> IndexName.parse('azul_foo_bar_dev') - IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=False) + IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=Contributions('foo_bar')) >>> IndexName.parse('azul_foo_bar_aggregate_dev') - IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=True) + IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=Aggregates('foo_bar')) >>> IndexName.parse('good_foo_dev', expected_prefix='good') - IndexName(prefix='good', version=1, deployment='dev', catalog=None, entity_type='foo', aggregate=False) + IndexName(prefix='good', version=1, deployment='dev', catalog=None, contents_type=Contributions('foo')) >>> IndexName.parse('bad_foo_dev') Traceback (most recent call last): @@ -1592,19 +1624,19 @@ def parse(cls, index_name, expected_prefix=prefix) -> 'IndexName': azul.RequirementError: entity_type ... '' >>> IndexName.parse('azul_v2_dev_main_foo') - IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo', aggregate=False) + IndexName(prefix='azul', version=2, deployment='dev', catalog='main', contents_type=Contributions('foo')) >>> IndexName.parse('azul_v2_dev_main_foo_aggregate') - IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo', aggregate=True) + IndexName(prefix='azul', version=2, deployment='dev', catalog='main', contents_type=Aggregates('foo')) >>> IndexName.parse('azul_v2_dev_main_foo_bar') - IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=False) + IndexName(prefix='azul', version=2, deployment='dev', catalog='main', contents_type=Contributions('foo_bar')) >>> IndexName.parse('azul_v2_dev_main_foo_bar_aggregate') - IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=True) + IndexName(prefix='azul', version=2, deployment='dev', catalog='main', contents_type=Aggregates('foo_bar')) >>> IndexName.parse('azul_v2_staging_hca_foo_bar_aggregate') - IndexName(prefix='azul', version=2, deployment='staging', catalog='hca', entity_type='foo_bar', aggregate=True) + IndexName(prefix='azul', version=2, deployment='staging', catalog='hca', contents_type=Aggregates('foo_bar')) >>> IndexName.parse('azul_v2_staging__foo_bar__aggregate') # doctest: +ELLIPSIS Traceback (most recent call last): @@ -1633,54 +1665,51 @@ def parse(cls, index_name, expected_prefix=prefix) -> 'IndexName': *index_name, deployment = index_name if index_name[-1] == 'aggregate': *index_name, _ = index_name - aggregate = True + contents_type_cls = Aggregates else: - aggregate = False + contents_type_cls = Contributions entity_type = '_'.join(index_name) - Config.validate_entity_type(entity_type) + contents_type = contents_type_cls(entity_type) return cls(prefix=prefix, version=version, deployment=deployment, catalog=catalog, - entity_type=entity_type, - aggregate=aggregate) + contents_type=contents_type) def __str__(self) -> str: """ - >>> str(IndexName(version=1, deployment='dev', entity_type='foo')) + >>> str(IndexName(version=1, deployment='dev', contents_type=Contributions('foo'))) 'azul_foo_dev' - >>> str(IndexName(version=1, deployment='dev', entity_type='foo', aggregate=True)) + >>> str(IndexName(version=1, deployment='dev', contents_type=Aggregates('foo'))) 'azul_foo_aggregate_dev' - >>> str(IndexName(version=1, deployment='dev', entity_type='foo_bar')) + >>> str(IndexName(version=1, deployment='dev', contents_type=Contributions('foo_bar'))) 'azul_foo_bar_dev' - >>> str(IndexName(version=1, deployment='dev', entity_type='foo_bar', aggregate=True)) + >>> str(IndexName(version=1, deployment='dev', contents_type=Aggregates('foo_bar'))) 'azul_foo_bar_aggregate_dev' - >>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo')) + >>> str(IndexName(version=2, deployment='dev', catalog='main', contents_type=Contributions('foo'))) 'azul_v2_dev_main_foo' - >>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo', aggregate=True)) + >>> str(IndexName(version=2, deployment='dev', catalog='main', contents_type=Aggregates('foo'))) 'azul_v2_dev_main_foo_aggregate' - >>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo_bar')) + >>> str(IndexName(version=2, deployment='dev', catalog='main', contents_type=Contributions('foo_bar'))) 'azul_v2_dev_main_foo_bar' - >>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=True)) + >>> str(IndexName(version=2, deployment='dev', catalog='main', contents_type=Aggregates('foo_bar'))) 'azul_v2_dev_main_foo_bar_aggregate' - >>> str(IndexName(version=2, deployment='staging', catalog='hca', entity_type='foo_bar', aggregate=True)) + >>> str(IndexName(version=2, deployment='staging', catalog='hca', contents_type=Aggregates('foo_bar'))) 'azul_v2_staging_hca_foo_bar_aggregate' """ - aggregate = ['aggregate'] if self.aggregate else [] if self.version == 1: require(self.catalog is None) return '_'.join([ self.prefix, - self.entity_type, - *aggregate, + str(self.contents_type), self.deployment ]) elif self.version == 2: @@ -1690,8 +1719,7 @@ def __str__(self) -> str: f'v{self.version}', self.deployment, self.catalog, - self.entity_type, - *aggregate, + str(self.contents_type), ]) else: assert False, self.version diff --git a/src/azul/indexer/document.py b/src/azul/indexer/document.py index fa4642281..e7396f744 100644 --- a/src/azul/indexer/document.py +++ b/src/azul/indexer/document.py @@ -30,7 +30,10 @@ ) from azul import ( + Aggregates, CatalogName, + Contributions, + IndexContentsType, IndexName, config, ) @@ -104,7 +107,6 @@ class DocumentCoordinates(Generic[E], metaclass=ABCMeta): be generic in E, the type of EntityReference. """ entity: E - aggregate: bool @property def index_name(self) -> str: @@ -115,8 +117,12 @@ def index_name(self) -> str: """ assert isinstance(self.entity, CataloguedEntityReference) return config.es_index_name(catalog=self.entity.catalog, - entity_type=self.entity.entity_type, - aggregate=self.aggregate) + contents_type=self.index_contents_type) + + @property + @abstractmethod + def index_contents_type(self) -> IndexContentsType: + raise NotImplementedError @property @abstractmethod @@ -127,7 +133,13 @@ def document_id(self) -> str: def from_hit(cls, hit: JSON) -> 'DocumentCoordinates[CataloguedEntityReference]': index_name = config.parse_es_index_name(hit['_index']) document_id = hit['_id'] - subcls = AggregateCoordinates if index_name.aggregate else ContributionCoordinates + index_contents_type = index_name.contents_type + if isinstance(index_contents_type, Contributions): + subcls = ContributionCoordinates + elif isinstance(index_contents_type, Aggregates): + subcls = AggregateCoordinates + else: + assert False, index_contents_type assert issubclass(subcls, cls) return subcls._from_index(index_name, document_id) @@ -158,7 +170,6 @@ def with_catalog(self, @attr.s(frozen=True, auto_attribs=True, kw_only=True, slots=True) class ContributionCoordinates(DocumentCoordinates[E], Generic[E]): - aggregate: bool = attr.ib(init=False, default=False) bundle: BundleFQID deleted: bool @@ -179,6 +190,10 @@ def __attrs_post_init__(self): concrete_type = type(self.bundle) assert concrete_type is BundleFQID, concrete_type + @property + def index_contents_type(self) -> IndexContentsType: + return Contributions(self.entity.entity_type) + @property def document_id(self) -> str: return '_'.join(( @@ -194,7 +209,7 @@ def _from_index(cls, document_id: str ) -> 'ContributionCoordinates[CataloguedEntityReference]': entity_type = index_name.entity_type - assert index_name.aggregate is False + assert isinstance(index_name.contents_type, Contributions) entity_id, bundle_uuid, bundle_version, deleted = document_id.split('_') if deleted == 'deleted': deleted = True @@ -223,15 +238,18 @@ class AggregateCoordinates(DocumentCoordinates[CataloguedEntityReference]): Document coordinates for aggregates. Aggregate coordinates always carry a catalog. """ - aggregate: bool = attr.ib(init=False, default=True) + + @property + def index_contents_type(self) -> IndexContentsType: + return Aggregates(self.entity.entity_type) @classmethod def _from_index(cls, index_name: IndexName, document_id: str ) -> 'AggregateCoordinates': + assert isinstance(index_name.contents_type, Aggregates) entity_type = index_name.entity_type - assert index_name.aggregate is True return cls(entity=CataloguedEntityReference(catalog=index_name.catalog, entity_type=entity_type, entity_id=document_id)) @@ -940,7 +958,6 @@ class Contribution(Document[ContributionCoordinates[E]]): def __attrs_post_init__(self): assert isinstance(self.coordinates, ContributionCoordinates) - assert self.coordinates.aggregate is False @classmethod def field_types(cls, field_types: FieldTypes) -> FieldTypes: @@ -1018,7 +1035,6 @@ def __init__(self, def __attrs_post_init__(self): assert isinstance(self.coordinates, AggregateCoordinates) - assert self.coordinates.aggregate is True @classmethod def field_types(cls, field_types: FieldTypes) -> FieldTypes: diff --git a/src/azul/indexer/index_service.py b/src/azul/indexer/index_service.py index 728766006..37eafb5b9 100644 --- a/src/azul/indexer/index_service.py +++ b/src/azul/indexer/index_service.py @@ -39,7 +39,9 @@ ) from azul import ( + Aggregates, CatalogName, + Contributions, cache, config, freeze, @@ -116,7 +118,7 @@ def repository_plugin(self, catalog: CatalogName) -> RepositoryPlugin: def settings(self, index_name) -> JSON: index_name = config.parse_es_index_name(index_name) - aggregate = index_name.aggregate + aggregate = isinstance(index_name.contents_type, Aggregates) catalog = index_name.catalog assert catalog is not None, catalog if config.catalogs[catalog].is_integration_test_catalog: @@ -164,10 +166,9 @@ def settings(self, index_name) -> JSON: def index_names(self, catalog: CatalogName) -> list[str]: return [ config.es_index_name(catalog=catalog, - entity_type=entity_type, - aggregate=aggregate) + contents_type=contents_cls(entity_type)) for entity_type in self.entity_types(catalog) - for aggregate in (False, True) + for contents_cls in [Contributions, Aggregates] ] def fetch_bundle(self, @@ -503,8 +504,7 @@ def _read_contributions(self, tallies: CataloguedTallies) -> list[CataloguedCont entity_ids_by_index: dict[str, MutableSet[str]] = defaultdict(set) for entity in tallies.keys(): index = config.es_index_name(catalog=entity.catalog, - entity_type=entity.entity_type, - aggregate=False) + contents_type=Contributions(entity.entity_type)) entity_ids_by_index[index].add(entity.entity_id) query = { diff --git a/src/azul/service/elasticsearch_service.py b/src/azul/service/elasticsearch_service.py index fba2349a7..6b59561bd 100644 --- a/src/azul/service/elasticsearch_service.py +++ b/src/azul/service/elasticsearch_service.py @@ -44,6 +44,7 @@ ) from azul import ( + Aggregates, CatalogName, cached_property, config, @@ -666,5 +667,4 @@ def create_request(self, catalog, entity_type) -> Search: """ return Search(using=self._es_client, index=config.es_index_name(catalog=catalog, - entity_type=entity_type, - aggregate=True)) + contents_type=Aggregates(entity_type))) diff --git a/test/indexer/__init__.py b/test/indexer/__init__.py index ea4ed8581..3132c5fea 100644 --- a/test/indexer/__init__.py +++ b/test/indexer/__init__.py @@ -149,8 +149,7 @@ def _load_canned_result(self, bundle_fqid: BundleFQID) -> MutableJSONs: for hit in expected_hits: index_name = IndexName.parse(hit['_index']) hit['_index'] = config.es_index_name(catalog=self.catalog, - entity_type=index_name.entity_type, - aggregate=index_name.aggregate) + contents_type=index_name.contents_type) return expected_hits @classmethod diff --git a/test/indexer/test_indexer.py b/test/indexer/test_indexer.py index 8f4b68114..d985f5159 100644 --- a/test/indexer/test_indexer.py +++ b/test/indexer/test_indexer.py @@ -37,6 +37,8 @@ ) from azul import ( + Aggregates, + Contributions, RequirementError, cached_property, config, @@ -206,7 +208,9 @@ def test_deletion(self): def _parse_index_name(self, hit) -> tuple[str, bool]: index_name = config.parse_es_index_name(hit['_index']) - return index_name.entity_type, index_name.aggregate + content_type = index_name.contents_type + aggregate = isinstance(content_type, Aggregates) + return content_type.entity_type, aggregate def test_duplicate_notification(self): # Contribute the bundle once @@ -1702,8 +1706,7 @@ def test_related_files_field_exclusion(self): # Check that the dynamic mapping has the related_files field disabled index = config.es_index_name(catalog=self.catalog, - entity_type='files', - aggregate=False) + contents_type=Contributions('files')) mapping = self.es_client.indices.get_mapping(index=index) contents = mapping[index]['mappings']['properties']['contents'] self.assertFalse(contents['properties']['files']['properties']['related_files']['enabled']) diff --git a/test/indexer/test_projects.py b/test/indexer/test_projects.py index 03ec3ad5c..f197ba840 100644 --- a/test/indexer/test_projects.py +++ b/test/indexer/test_projects.py @@ -5,6 +5,8 @@ ) from azul import ( + Aggregates, + Contributions, config, ) from azul.es import ( @@ -66,9 +68,9 @@ def test_hca_extraction(self): for aggregate in True, False: with self.subTest(aggregate=aggregate): def index_name(entity_type): + contents_cls = Aggregates if aggregate else Contributions return config.es_index_name(catalog=self.catalog, - entity_type=entity_type, - aggregate=aggregate) + contents_type=contents_cls(entity_type)) total_projects = self.es_client.count(index=index_name('projects')) # Three unique projects, six project contributions diff --git a/test/service/__init__.py b/test/service/__init__.py index dfa87141c..18e9758c7 100644 --- a/test/service/__init__.py +++ b/test/service/__init__.py @@ -31,6 +31,7 @@ LocalAppTestCase, ) from azul import ( + Aggregates, JSON, cached_property, config, @@ -179,8 +180,7 @@ def _add_docs(self, num_docs): @property def _index_name(self): return config.es_index_name(catalog=self.catalog, - entity_type='files', - aggregate=True) + contents_type=Aggregates('files')) class StorageServiceTestMixin: diff --git a/test/service/test_response.py b/test/service/test_response.py index cafcf525e..f5214b830 100644 --- a/test/service/test_response.py +++ b/test/service/test_response.py @@ -45,6 +45,7 @@ LocalAppTestCase, ) from azul import ( + Aggregates, cached_property, config, ) @@ -151,8 +152,7 @@ def _get_hits(self, entity_type: str, entity_id: str): } # Tests are assumed to only ever run with the azul dev index results = self.es_client.search(index=config.es_index_name(catalog=self.catalog, - entity_type=entity_type, - aggregate=True), + contents_type=Aggregates(entity_type)), body=body) return self._index_service.translate_fields(catalog=self.catalog, doc=[results['hits']['hits'][0]['_source']],