Skip to content

Commit

Permalink
Refactor indices to accomodate replicas (#5358)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Jun 30, 2023
1 parent 9971402 commit 9819b1a
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 80 deletions.
130 changes: 79 additions & 51 deletions src/azul/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from abc import (
ABCMeta,
)
from collections import (
ChainMap,
)
Expand Down Expand Up @@ -81,6 +84,33 @@ def cache_per_thread(f, /):
mutable_furl = furl


class IndexContentsType(metaclass=ABCMeta):
entity_type: str

def __str__(self):
return self.entity_type


class Transformations(IndexContentsType):

def __init__(self, /, entity_type: str):
Config.validate_entity_type(entity_type)
self.entity_type = entity_type

def __repr__(self):
return f'{type(self).__name__}({self.entity_type!r})'


class Contributions(Transformations):
pass


class Aggregates(Transformations):

def __str__(self):
return f'{self.entity_type}_aggregate'


class Config:
"""
See `environment` for documentation of these settings.
Expand Down Expand Up @@ -931,15 +961,13 @@ def integration_test_catalogs(self) -> Mapping[CatalogName, Catalog]:

def es_index_name(self,
catalog: CatalogName,
entity_type: str,
aggregate: bool
contents_type: IndexContentsType
) -> str:
return str(IndexName(prefix=self._index_prefix,
version=2,
deployment=self.deployment_stage,
catalog=catalog,
entity_type=entity_type,
aggregate=aggregate))
contents_type=contents_type))

def parse_es_index_name(self, index_name: str) -> 'IndexName':
"""
Expand Down Expand Up @@ -1487,56 +1515,60 @@ class IndexName:
#: The catalog the index belongs to or None for v1 indices.
catalog: Optional[CatalogName] = attr.ib(default=None)

#: The type of entities this index contains metadata about
entity_type: str

#: Whether the documents in the index are contributions or aggregates
aggregate: bool = False
#: The type of entities this index contains metadata about, and whether they
#: contributions, aggregates, or replicas.
contents_type: IndexContentsType

index_name_version_re: ClassVar[re.Pattern] = re.compile(r'v(\d+)')

@property
def entity_type(self) -> str:
return self.contents_type.entity_type

def __attrs_post_init__(self):
"""
>>> IndexName(prefix='azul', version=1, deployment='dev', entity_type='foo_bar')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=False)
>>> contents_type = Contributions('foo_bar')
>>> IndexName(prefix='azul', version=1, deployment='dev', contents_type=contents_type)
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=Contributions('foo_bar'))
>>> IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=False)
>>> IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=contents_type)
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=Contributions('foo_bar'))
>>> IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=False)
>>> IndexName(prefix='azul', version=2, deployment='dev', catalog='main', contents_type=contents_type)
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', contents_type=Contributions('foo_bar'))
>>> IndexName(prefix='azul', version=1, deployment='dev', catalog='hca', entity_type='foo')
>>> IndexName(prefix='azul', version=1, deployment='dev', catalog='hca', contents_type=contents_type)
Traceback (most recent call last):
...
azul.RequirementError: Version 1 prohibits a catalog name ('hca').
>>> IndexName(prefix='azul', version=2, deployment='dev', entity_type='foo')
>>> IndexName(prefix='azul', version=2, deployment='dev', contents_type=contents_type)
Traceback (most recent call last):
...
azul.RequirementError: Version 2 requires a catalog name (None).
>>> IndexName(prefix='azul', version=2, deployment='dev', catalog=None, entity_type='foo')
>>> IndexName(prefix='azul', version=2, deployment='dev', catalog=None, contents_type=contents_type)
Traceback (most recent call last):
...
azul.RequirementError: Version 2 requires a catalog name (None).
>>> IndexName(prefix='_', version=2, deployment='dev', catalog='foo', entity_type='bar')
>>> contents_type = Contributions('bar')
>>> IndexName(prefix='_', version=2, deployment='dev', catalog='foo', contents_type=contents_type)
Traceback (most recent call last):
...
azul.RequirementError: Prefix '_' is to short, too long or contains invalid characters.
>>> IndexName(prefix='azul', version=2, deployment='_', catalog='foo', entity_type='bar')
>>> IndexName(prefix='azul', version=2, deployment='_', catalog='foo', contents_type=contents_type)
Traceback (most recent call last):
...
azul.RequirementError: Deployment name '_' is to short, too long or contains invalid characters.
>>> IndexName(prefix='azul', version=2, deployment='dev', catalog='_', entity_type='bar')
>>> IndexName(prefix='azul', version=2, deployment='dev', catalog='_', contents_type=contents_type)
Traceback (most recent call last):
...
azul.RequirementError: ('Catalog name is invalid', '_')
>>> IndexName(prefix='azul', version=2, deployment='dev', catalog='foo', entity_type='_')
>>> IndexName(prefix='azul', version=2, deployment='dev', catalog='foo', contents_type=Contributions('_'))
Traceback (most recent call last):
...
azul.RequirementError: entity_type is either too short, too long or contains invalid characters: '_'
Expand All @@ -1562,19 +1594,19 @@ def parse(cls, index_name, expected_prefix=prefix) -> 'IndexName':
Parse the name of an index from any deployment and any version of Azul.
>>> IndexName.parse('azul_foo_dev')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo', aggregate=False)
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=Contributions('foo'))
>>> IndexName.parse('azul_foo_aggregate_dev')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo', aggregate=True)
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=Aggregates('foo'))
>>> IndexName.parse('azul_foo_bar_dev')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=False)
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=Contributions('foo_bar'))
>>> IndexName.parse('azul_foo_bar_aggregate_dev')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=True)
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, contents_type=Aggregates('foo_bar'))
>>> IndexName.parse('good_foo_dev', expected_prefix='good')
IndexName(prefix='good', version=1, deployment='dev', catalog=None, entity_type='foo', aggregate=False)
IndexName(prefix='good', version=1, deployment='dev', catalog=None, contents_type=Contributions('foo'))
>>> IndexName.parse('bad_foo_dev')
Traceback (most recent call last):
Expand All @@ -1592,19 +1624,19 @@ def parse(cls, index_name, expected_prefix=prefix) -> 'IndexName':
azul.RequirementError: entity_type ... ''
>>> IndexName.parse('azul_v2_dev_main_foo')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo', aggregate=False)
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', contents_type=Contributions('foo'))
>>> IndexName.parse('azul_v2_dev_main_foo_aggregate')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo', aggregate=True)
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', contents_type=Aggregates('foo'))
>>> IndexName.parse('azul_v2_dev_main_foo_bar')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=False)
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', contents_type=Contributions('foo_bar'))
>>> IndexName.parse('azul_v2_dev_main_foo_bar_aggregate')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=True)
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', contents_type=Aggregates('foo_bar'))
>>> IndexName.parse('azul_v2_staging_hca_foo_bar_aggregate')
IndexName(prefix='azul', version=2, deployment='staging', catalog='hca', entity_type='foo_bar', aggregate=True)
IndexName(prefix='azul', version=2, deployment='staging', catalog='hca', contents_type=Aggregates('foo_bar'))
>>> IndexName.parse('azul_v2_staging__foo_bar__aggregate') # doctest: +ELLIPSIS
Traceback (most recent call last):
Expand Down Expand Up @@ -1633,54 +1665,51 @@ def parse(cls, index_name, expected_prefix=prefix) -> 'IndexName':
*index_name, deployment = index_name
if index_name[-1] == 'aggregate':
*index_name, _ = index_name
aggregate = True
contents_type_cls = Aggregates
else:
aggregate = False
contents_type_cls = Contributions
entity_type = '_'.join(index_name)
Config.validate_entity_type(entity_type)
contents_type = contents_type_cls(entity_type)
return cls(prefix=prefix,
version=version,
deployment=deployment,
catalog=catalog,
entity_type=entity_type,
aggregate=aggregate)
contents_type=contents_type)

def __str__(self) -> str:
"""
>>> str(IndexName(version=1, deployment='dev', entity_type='foo'))
>>> str(IndexName(version=1, deployment='dev', contents_type=Contributions('foo')))
'azul_foo_dev'
>>> str(IndexName(version=1, deployment='dev', entity_type='foo', aggregate=True))
>>> str(IndexName(version=1, deployment='dev', contents_type=Aggregates('foo')))
'azul_foo_aggregate_dev'
>>> str(IndexName(version=1, deployment='dev', entity_type='foo_bar'))
>>> str(IndexName(version=1, deployment='dev', contents_type=Contributions('foo_bar')))
'azul_foo_bar_dev'
>>> str(IndexName(version=1, deployment='dev', entity_type='foo_bar', aggregate=True))
>>> str(IndexName(version=1, deployment='dev', contents_type=Aggregates('foo_bar')))
'azul_foo_bar_aggregate_dev'
>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo'))
>>> str(IndexName(version=2, deployment='dev', catalog='main', contents_type=Contributions('foo')))
'azul_v2_dev_main_foo'
>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo', aggregate=True))
>>> str(IndexName(version=2, deployment='dev', catalog='main', contents_type=Aggregates('foo')))
'azul_v2_dev_main_foo_aggregate'
>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo_bar'))
>>> str(IndexName(version=2, deployment='dev', catalog='main', contents_type=Contributions('foo_bar')))
'azul_v2_dev_main_foo_bar'
>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=True))
>>> str(IndexName(version=2, deployment='dev', catalog='main', contents_type=Aggregates('foo_bar')))
'azul_v2_dev_main_foo_bar_aggregate'
>>> str(IndexName(version=2, deployment='staging', catalog='hca', entity_type='foo_bar', aggregate=True))
>>> str(IndexName(version=2, deployment='staging', catalog='hca', contents_type=Aggregates('foo_bar')))
'azul_v2_staging_hca_foo_bar_aggregate'
"""
aggregate = ['aggregate'] if self.aggregate else []
if self.version == 1:
require(self.catalog is None)
return '_'.join([
self.prefix,
self.entity_type,
*aggregate,
str(self.contents_type),
self.deployment
])
elif self.version == 2:
Expand All @@ -1690,8 +1719,7 @@ def __str__(self) -> str:
f'v{self.version}',
self.deployment,
self.catalog,
self.entity_type,
*aggregate,
str(self.contents_type),
])
else:
assert False, self.version
Expand Down
36 changes: 26 additions & 10 deletions src/azul/indexer/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
)

from azul import (
Aggregates,
CatalogName,
Contributions,
IndexContentsType,
IndexName,
config,
)
Expand Down Expand Up @@ -104,7 +107,6 @@ class DocumentCoordinates(Generic[E], metaclass=ABCMeta):
be generic in E, the type of EntityReference.
"""
entity: E
aggregate: bool

@property
def index_name(self) -> str:
Expand All @@ -115,8 +117,12 @@ def index_name(self) -> str:
"""
assert isinstance(self.entity, CataloguedEntityReference)
return config.es_index_name(catalog=self.entity.catalog,
entity_type=self.entity.entity_type,
aggregate=self.aggregate)
contents_type=self.index_contents_type)

@property
@abstractmethod
def index_contents_type(self) -> IndexContentsType:
raise NotImplementedError

@property
@abstractmethod
Expand All @@ -127,7 +133,13 @@ def document_id(self) -> str:
def from_hit(cls, hit: JSON) -> 'DocumentCoordinates[CataloguedEntityReference]':
index_name = config.parse_es_index_name(hit['_index'])
document_id = hit['_id']
subcls = AggregateCoordinates if index_name.aggregate else ContributionCoordinates
index_contents_type = index_name.contents_type
if isinstance(index_contents_type, Contributions):
subcls = ContributionCoordinates
elif isinstance(index_contents_type, Aggregates):
subcls = AggregateCoordinates
else:
assert False, index_contents_type
assert issubclass(subcls, cls)
return subcls._from_index(index_name, document_id)

Expand Down Expand Up @@ -158,7 +170,6 @@ def with_catalog(self,

@attr.s(frozen=True, auto_attribs=True, kw_only=True, slots=True)
class ContributionCoordinates(DocumentCoordinates[E], Generic[E]):
aggregate: bool = attr.ib(init=False, default=False)
bundle: BundleFQID
deleted: bool

Expand All @@ -179,6 +190,10 @@ def __attrs_post_init__(self):
concrete_type = type(self.bundle)
assert concrete_type is BundleFQID, concrete_type

@property
def index_contents_type(self) -> IndexContentsType:
return Contributions(self.entity.entity_type)

@property
def document_id(self) -> str:
return '_'.join((
Expand All @@ -194,7 +209,7 @@ def _from_index(cls,
document_id: str
) -> 'ContributionCoordinates[CataloguedEntityReference]':
entity_type = index_name.entity_type
assert index_name.aggregate is False
assert isinstance(index_name.contents_type, Contributions)
entity_id, bundle_uuid, bundle_version, deleted = document_id.split('_')
if deleted == 'deleted':
deleted = True
Expand Down Expand Up @@ -223,15 +238,18 @@ class AggregateCoordinates(DocumentCoordinates[CataloguedEntityReference]):
Document coordinates for aggregates. Aggregate coordinates always carry a
catalog.
"""
aggregate: bool = attr.ib(init=False, default=True)

@property
def index_contents_type(self) -> IndexContentsType:
return Aggregates(self.entity.entity_type)

@classmethod
def _from_index(cls,
index_name: IndexName,
document_id: str
) -> 'AggregateCoordinates':
assert isinstance(index_name.contents_type, Aggregates)
entity_type = index_name.entity_type
assert index_name.aggregate is True
return cls(entity=CataloguedEntityReference(catalog=index_name.catalog,
entity_type=entity_type,
entity_id=document_id))
Expand Down Expand Up @@ -940,7 +958,6 @@ class Contribution(Document[ContributionCoordinates[E]]):

def __attrs_post_init__(self):
assert isinstance(self.coordinates, ContributionCoordinates)
assert self.coordinates.aggregate is False

@classmethod
def field_types(cls, field_types: FieldTypes) -> FieldTypes:
Expand Down Expand Up @@ -1018,7 +1035,6 @@ def __init__(self,

def __attrs_post_init__(self):
assert isinstance(self.coordinates, AggregateCoordinates)
assert self.coordinates.aggregate is True

@classmethod
def field_types(cls, field_types: FieldTypes) -> FieldTypes:
Expand Down
Loading

0 comments on commit 9819b1a

Please sign in to comment.