Skip to content

Commit

Permalink
Refactor indices to accomodate replicas (#5358)
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Sep 15, 2023
1 parent f81436b commit 51471c6
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 191 deletions.
190 changes: 142 additions & 48 deletions src/azul/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)
from enum import (
Enum,
auto,
)
import functools
from itertools import (
Expand Down Expand Up @@ -81,6 +82,11 @@ def cache_per_thread(f, /):
mutable_furl = furl


class DocumentType(Enum):
contributions = auto()
aggregates = auto()


class Config:
"""
See `environment` for documentation of these settings.
Expand Down Expand Up @@ -938,14 +944,14 @@ def integration_test_catalogs(self) -> Mapping[CatalogName, Catalog]:
def es_index_name(self,
catalog: CatalogName,
entity_type: str,
aggregate: bool
doc_type: DocumentType
) -> str:
return str(IndexName(prefix=self._index_prefix,
version=2,
deployment=self.deployment_stage,
catalog=catalog,
entity_type=entity_type,
aggregate=aggregate))
doc_type=doc_type))

def parse_es_index_name(self, index_name: str) -> 'IndexName':
"""
Expand Down Expand Up @@ -1511,20 +1517,46 @@ class IndexName:
entity_type: str

#: Whether the documents in the index are contributions or aggregates
aggregate: bool = False
doc_type: DocumentType = DocumentType.contributions

index_name_version_re: ClassVar[re.Pattern] = re.compile(r'v(\d+)')

def __attrs_post_init__(self):
"""
>>> IndexName(prefix='azul', version=1, deployment='dev', entity_type='foo_bar')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=False)
>>> IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=False)
>>> IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=False)
>>> IndexName(prefix='azul',
... version=1,
... deployment='dev',
... entity_type='foo_bar') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo_bar',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName(prefix='azul',
... version=1,
... deployment='dev',
... catalog=None,
... entity_type='foo_bar') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo_bar',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName(prefix='azul',
... version=2,
... deployment='dev',
... catalog='main',
... entity_type='foo_bar') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo_bar',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName(prefix='azul', version=1, deployment='dev', catalog='hca', entity_type='foo')
Traceback (most recent call last):
Expand Down Expand Up @@ -1581,20 +1613,45 @@ def parse(cls, index_name, expected_prefix=prefix) -> 'IndexName':
"""
Parse the name of an index from any deployment and any version of Azul.
>>> IndexName.parse('azul_foo_dev')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo', aggregate=False)
>>> IndexName.parse('azul_foo_aggregate_dev')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo', aggregate=True)
>>> IndexName.parse('azul_foo_bar_dev')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=False)
>>> IndexName.parse('azul_foo_bar_aggregate_dev')
IndexName(prefix='azul', version=1, deployment='dev', catalog=None, entity_type='foo_bar', aggregate=True)
>>> IndexName.parse('good_foo_dev', expected_prefix='good')
IndexName(prefix='good', version=1, deployment='dev', catalog=None, entity_type='foo', aggregate=False)
>>> IndexName.parse('azul_foo_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName.parse('azul_foo_aggregate_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo',
doc_type=<DocumentType.aggregates: 2>)
>>> IndexName.parse('azul_foo_bar_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo_bar',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName.parse('azul_foo_bar_aggregate_dev') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=1,
deployment='dev',
catalog=None,
entity_type='foo_bar',
doc_type=<DocumentType.aggregates: 2>)
>>> IndexName.parse('good_foo_dev', expected_prefix='good') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='good',
version=1,
deployment='dev',
catalog=None,
entity_type='foo',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName.parse('bad_foo_dev')
Traceback (most recent call last):
Expand All @@ -1611,20 +1668,45 @@ def parse(cls, index_name, expected_prefix=prefix) -> 'IndexName':
...
azul.RequirementError: entity_type ... ''
>>> IndexName.parse('azul_v2_dev_main_foo')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo', aggregate=False)
>>> IndexName.parse('azul_v2_dev_main_foo_aggregate')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo', aggregate=True)
>>> IndexName.parse('azul_v2_dev_main_foo_bar')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=False)
>>> IndexName.parse('azul_v2_dev_main_foo_bar_aggregate')
IndexName(prefix='azul', version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=True)
>>> IndexName.parse('azul_v2_staging_hca_foo_bar_aggregate')
IndexName(prefix='azul', version=2, deployment='staging', catalog='hca', entity_type='foo_bar', aggregate=True)
>>> IndexName.parse('azul_v2_dev_main_foo') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName.parse('azul_v2_dev_main_foo_aggregate') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo',
doc_type=<DocumentType.aggregates: 2>)
>>> IndexName.parse('azul_v2_dev_main_foo_bar') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo_bar',
doc_type=<DocumentType.contributions: 1>)
>>> IndexName.parse('azul_v2_dev_main_foo_bar_aggregate') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='dev',
catalog='main',
entity_type='foo_bar',
doc_type=<DocumentType.aggregates: 2>)
>>> IndexName.parse('azul_v2_staging_hca_foo_bar_aggregate') # doctest: +NORMALIZE_WHITESPACE
IndexName(prefix='azul',
version=2,
deployment='staging',
catalog='hca',
entity_type='foo_bar',
doc_type=<DocumentType.aggregates: 2>)
>>> IndexName.parse('azul_v2_staging__foo_bar__aggregate') # doctest: +ELLIPSIS
Traceback (most recent call last):
Expand Down Expand Up @@ -1653,48 +1735,60 @@ def parse(cls, index_name, expected_prefix=prefix) -> 'IndexName':
*index_name, deployment = index_name
if index_name[-1] == 'aggregate':
*index_name, _ = index_name
aggregate = True
doc_type = DocumentType.aggregates
else:
aggregate = False
doc_type = DocumentType.contributions
entity_type = '_'.join(index_name)
Config.validate_entity_type(entity_type)
return cls(prefix=prefix,
version=version,
deployment=deployment,
catalog=catalog,
entity_type=entity_type,
aggregate=aggregate)
doc_type=doc_type)

def __str__(self) -> str:
"""
>>> str(IndexName(version=1, deployment='dev', entity_type='foo'))
'azul_foo_dev'
>>> str(IndexName(version=1, deployment='dev', entity_type='foo', aggregate=True))
>>> str(IndexName(version=1, deployment='dev', entity_type='foo', doc_type=DocumentType.aggregates))
'azul_foo_aggregate_dev'
>>> str(IndexName(version=1, deployment='dev', entity_type='foo_bar'))
'azul_foo_bar_dev'
>>> str(IndexName(version=1, deployment='dev', entity_type='foo_bar', aggregate=True))
>>> str(IndexName(version=1, deployment='dev', entity_type='foo_bar', doc_type=DocumentType.aggregates))
'azul_foo_bar_aggregate_dev'
>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo'))
'azul_v2_dev_main_foo'
>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo', aggregate=True))
>>> str(IndexName(version=2,
... deployment='dev',
... catalog='main',
... entity_type='foo',
... doc_type=DocumentType.aggregates))
'azul_v2_dev_main_foo_aggregate'
>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo_bar'))
'azul_v2_dev_main_foo_bar'
>>> str(IndexName(version=2, deployment='dev', catalog='main', entity_type='foo_bar', aggregate=True))
>>> str(IndexName(version=2,
... deployment='dev',
... catalog='main',
... entity_type='foo_bar',
... doc_type=DocumentType.aggregates))
'azul_v2_dev_main_foo_bar_aggregate'
>>> str(IndexName(version=2, deployment='staging', catalog='hca', entity_type='foo_bar', aggregate=True))
>>> str(IndexName(version=2,
... deployment='staging',
... catalog='hca',
... entity_type='foo_bar',
... doc_type=DocumentType.aggregates))
'azul_v2_staging_hca_foo_bar_aggregate'
"""
aggregate = ['aggregate'] if self.aggregate else []
aggregate = ['aggregate'] if self.doc_type is DocumentType.aggregates else []
if self.version == 1:
require(self.catalog is None)
return '_'.join([
Expand Down
24 changes: 15 additions & 9 deletions src/azul/indexer/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from azul import (
CatalogName,
DocumentType,
IndexName,
config,
)
Expand Down Expand Up @@ -107,7 +108,7 @@ class DocumentCoordinates(Generic[E], metaclass=ABCMeta):
be generic in E, the type of EntityReference.
"""
entity: E
aggregate: bool
doc_type: DocumentType

@property
def index_name(self) -> str:
Expand All @@ -119,7 +120,7 @@ def index_name(self) -> str:
assert isinstance(self.entity, CataloguedEntityReference)
return config.es_index_name(catalog=self.entity.catalog,
entity_type=self.entity.entity_type,
aggregate=self.aggregate)
doc_type=self.doc_type)

@property
@abstractmethod
Expand All @@ -132,7 +133,12 @@ def from_hit(cls,
) -> 'DocumentCoordinates[CataloguedEntityReference]':
index_name = config.parse_es_index_name(hit['_index'])
document_id = hit['_id']
subcls = AggregateCoordinates if index_name.aggregate else ContributionCoordinates
if index_name.doc_type is DocumentType.contributions:
subcls = ContributionCoordinates
elif index_name.doc_type is DocumentType.aggregates:
subcls = AggregateCoordinates
else:
assert False, index_name.doc_type
assert issubclass(subcls, cls)
return subcls._from_index(index_name, document_id)

Expand Down Expand Up @@ -164,7 +170,7 @@ def with_catalog(self,

@attr.s(frozen=True, auto_attribs=True, kw_only=True, slots=True)
class ContributionCoordinates(DocumentCoordinates[E], Generic[E]):
aggregate: bool = attr.ib(init=False, default=False)
doc_type: DocumentType = attr.ib(init=False, default=DocumentType.contributions)
bundle: BundleFQID
deleted: bool

Expand Down Expand Up @@ -200,7 +206,7 @@ def _from_index(cls,
document_id: str
) -> 'ContributionCoordinates[CataloguedEntityReference]':
entity_type = index_name.entity_type
assert index_name.aggregate is False
assert index_name.doc_type is DocumentType.contributions
entity_id, bundle_uuid, bundle_version, deleted = document_id.split('_')
if deleted == 'deleted':
deleted = True
Expand Down Expand Up @@ -229,15 +235,15 @@ class AggregateCoordinates(DocumentCoordinates[CataloguedEntityReference]):
Document coordinates for aggregates. Aggregate coordinates always carry a
catalog.
"""
aggregate: bool = attr.ib(init=False, default=True)
doc_type: DocumentType = attr.ib(init=False, default=DocumentType.aggregates)

@classmethod
def _from_index(cls,
index_name: IndexName,
document_id: str
) -> 'AggregateCoordinates':
entity_type = index_name.entity_type
assert index_name.aggregate is True
assert index_name.doc_type is DocumentType.aggregates
return cls(entity=CataloguedEntityReference(catalog=index_name.catalog,
entity_type=entity_type,
entity_id=document_id))
Expand Down Expand Up @@ -957,7 +963,7 @@ class Contribution(Document[ContributionCoordinates[E]]):

def __attrs_post_init__(self):
assert isinstance(self.coordinates, ContributionCoordinates)
assert self.coordinates.aggregate is False
assert self.coordinates.doc_type is DocumentType.contributions

@classmethod
def field_types(cls, field_types: FieldTypes) -> FieldTypes:
Expand Down Expand Up @@ -1035,7 +1041,7 @@ def __init__(self,

def __attrs_post_init__(self):
assert isinstance(self.coordinates, AggregateCoordinates)
assert self.coordinates.aggregate is True
assert self.coordinates.doc_type is DocumentType.aggregates

@classmethod
def field_types(cls, field_types: FieldTypes) -> FieldTypes:
Expand Down
Loading

0 comments on commit 51471c6

Please sign in to comment.